001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
024import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
025
026import com.google.errorprone.annotations.RestrictedApi;
027import io.opentelemetry.api.trace.Span;
028import io.opentelemetry.api.trace.StatusCode;
029import io.opentelemetry.context.Scope;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.Constructor;
033import java.lang.reflect.InvocationTargetException;
034import java.net.InetAddress;
035import java.net.InetSocketAddress;
036import java.net.UnknownHostException;
037import java.time.Instant;
038import java.time.ZoneId;
039import java.time.format.DateTimeFormatter;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Collection;
043import java.util.Collections;
044import java.util.Comparator;
045import java.util.EnumSet;
046import java.util.HashMap;
047import java.util.HashSet;
048import java.util.Iterator;
049import java.util.LinkedList;
050import java.util.List;
051import java.util.Map;
052import java.util.Objects;
053import java.util.Optional;
054import java.util.Set;
055import java.util.concurrent.ExecutionException;
056import java.util.concurrent.Future;
057import java.util.concurrent.Semaphore;
058import java.util.concurrent.TimeUnit;
059import java.util.concurrent.TimeoutException;
060import java.util.concurrent.atomic.AtomicInteger;
061import java.util.regex.Pattern;
062import java.util.stream.Collectors;
063import javax.servlet.http.HttpServlet;
064import org.apache.commons.lang3.StringUtils;
065import org.apache.hadoop.conf.Configuration;
066import org.apache.hadoop.fs.FSDataInputStream;
067import org.apache.hadoop.fs.FSDataOutputStream;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.CatalogFamilyFormat;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.CellBuilderFactory;
072import org.apache.hadoop.hbase.CellBuilderType;
073import org.apache.hadoop.hbase.ClusterId;
074import org.apache.hadoop.hbase.ClusterMetrics;
075import org.apache.hadoop.hbase.ClusterMetrics.Option;
076import org.apache.hadoop.hbase.ClusterMetricsBuilder;
077import org.apache.hadoop.hbase.DoNotRetryIOException;
078import org.apache.hadoop.hbase.HBaseIOException;
079import org.apache.hadoop.hbase.HBaseInterfaceAudience;
080import org.apache.hadoop.hbase.HBaseServerBase;
081import org.apache.hadoop.hbase.HConstants;
082import org.apache.hadoop.hbase.HRegionLocation;
083import org.apache.hadoop.hbase.InvalidFamilyOperationException;
084import org.apache.hadoop.hbase.MasterNotRunningException;
085import org.apache.hadoop.hbase.MetaTableAccessor;
086import org.apache.hadoop.hbase.NamespaceDescriptor;
087import org.apache.hadoop.hbase.PleaseHoldException;
088import org.apache.hadoop.hbase.PleaseRestartMasterException;
089import org.apache.hadoop.hbase.RegionMetrics;
090import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
091import org.apache.hadoop.hbase.ScheduledChore;
092import org.apache.hadoop.hbase.ServerMetrics;
093import org.apache.hadoop.hbase.ServerName;
094import org.apache.hadoop.hbase.ServerTask;
095import org.apache.hadoop.hbase.ServerTaskBuilder;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.TableNotDisabledException;
098import org.apache.hadoop.hbase.TableNotFoundException;
099import org.apache.hadoop.hbase.UnknownRegionException;
100import org.apache.hadoop.hbase.client.BalanceRequest;
101import org.apache.hadoop.hbase.client.BalanceResponse;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
103import org.apache.hadoop.hbase.client.CompactionState;
104import org.apache.hadoop.hbase.client.MasterSwitchType;
105import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
106import org.apache.hadoop.hbase.client.Put;
107import org.apache.hadoop.hbase.client.RegionInfo;
108import org.apache.hadoop.hbase.client.RegionInfoBuilder;
109import org.apache.hadoop.hbase.client.RegionStatesCount;
110import org.apache.hadoop.hbase.client.ResultScanner;
111import org.apache.hadoop.hbase.client.Scan;
112import org.apache.hadoop.hbase.client.TableDescriptor;
113import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
114import org.apache.hadoop.hbase.client.TableState;
115import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
116import org.apache.hadoop.hbase.exceptions.DeserializationException;
117import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
118import org.apache.hadoop.hbase.executor.ExecutorType;
119import org.apache.hadoop.hbase.favored.FavoredNodesManager;
120import org.apache.hadoop.hbase.http.HttpServer;
121import org.apache.hadoop.hbase.http.InfoServer;
122import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
123import org.apache.hadoop.hbase.ipc.RpcServer;
124import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
125import org.apache.hadoop.hbase.log.HBaseMarkers;
126import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
127import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
128import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
129import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
130import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
131import org.apache.hadoop.hbase.master.assignment.RegionStates;
132import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
133import org.apache.hadoop.hbase.master.balancer.BalancerChore;
134import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
135import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
136import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
137import org.apache.hadoop.hbase.master.balancer.LoadBalancerStateStore;
138import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer;
139import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
140import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
141import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
142import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
143import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
144import org.apache.hadoop.hbase.master.hbck.HbckChore;
145import org.apache.hadoop.hbase.master.http.MasterDumpServlet;
146import org.apache.hadoop.hbase.master.http.MasterRedirectServlet;
147import org.apache.hadoop.hbase.master.http.MasterStatusServlet;
148import org.apache.hadoop.hbase.master.http.api_v1.ResourceConfigFactory;
149import org.apache.hadoop.hbase.master.http.hbck.HbckConfigFactory;
150import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
151import org.apache.hadoop.hbase.master.locking.LockManager;
152import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
153import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
154import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
155import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerStateStore;
156import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
157import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
158import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
159import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
160import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
161import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
162import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
163import org.apache.hadoop.hbase.master.procedure.LogRollProcedure;
164import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
165import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
166import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
167import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
168import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
169import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
170import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
171import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
172import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
173import org.apache.hadoop.hbase.master.procedure.ReloadQuotasProcedure;
174import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
175import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
176import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
177import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
178import org.apache.hadoop.hbase.master.region.MasterRegion;
179import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
180import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
181import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
182import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
183import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
184import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
185import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
186import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
187import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore;
188import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
189import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
190import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
191import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
192import org.apache.hadoop.hbase.master.snapshot.SnapshotCleanupStateStore;
193import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
194import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
195import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
196import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
197import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
198import org.apache.hadoop.hbase.mob.MobFileCompactionChore;
199import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
200import org.apache.hadoop.hbase.monitoring.MonitoredTask;
201import org.apache.hadoop.hbase.monitoring.TaskGroup;
202import org.apache.hadoop.hbase.monitoring.TaskMonitor;
203import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
204import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
205import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
206import org.apache.hadoop.hbase.procedure2.LockedResource;
207import org.apache.hadoop.hbase.procedure2.Procedure;
208import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
209import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
210import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
211import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
212import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
213import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
214import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
215import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
216import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
217import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
218import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
219import org.apache.hadoop.hbase.quotas.QuotaUtil;
220import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
221import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
222import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
223import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
224import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
225import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
226import org.apache.hadoop.hbase.regionserver.HRegionServer;
227import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
228import org.apache.hadoop.hbase.regionserver.storefiletracker.ModifyColumnFamilyStoreFileTrackerProcedure;
229import org.apache.hadoop.hbase.regionserver.storefiletracker.ModifyTableStoreFileTrackerProcedure;
230import org.apache.hadoop.hbase.replication.ReplicationException;
231import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
232import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
233import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
234import org.apache.hadoop.hbase.replication.ReplicationUtils;
235import org.apache.hadoop.hbase.replication.SyncReplicationState;
236import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
237import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
238import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
239import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
240import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
241import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
242import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
243import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
244import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
245import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
246import org.apache.hadoop.hbase.rsgroup.RSGroupUtil;
247import org.apache.hadoop.hbase.security.AccessDeniedException;
248import org.apache.hadoop.hbase.security.SecurityConstants;
249import org.apache.hadoop.hbase.security.Superusers;
250import org.apache.hadoop.hbase.security.UserProvider;
251import org.apache.hadoop.hbase.trace.TraceUtil;
252import org.apache.hadoop.hbase.util.Addressing;
253import org.apache.hadoop.hbase.util.Bytes;
254import org.apache.hadoop.hbase.util.CommonFSUtils;
255import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
256import org.apache.hadoop.hbase.util.DNS;
257import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
258import org.apache.hadoop.hbase.util.FSTableDescriptors;
259import org.apache.hadoop.hbase.util.FutureUtils;
260import org.apache.hadoop.hbase.util.HBaseFsck;
261import org.apache.hadoop.hbase.util.HFileArchiveUtil;
262import org.apache.hadoop.hbase.util.IdLock;
263import org.apache.hadoop.hbase.util.JVMClusterUtil;
264import org.apache.hadoop.hbase.util.JsonMapper;
265import org.apache.hadoop.hbase.util.ModifyRegionUtils;
266import org.apache.hadoop.hbase.util.Pair;
267import org.apache.hadoop.hbase.util.ReflectionUtils;
268import org.apache.hadoop.hbase.util.RetryCounter;
269import org.apache.hadoop.hbase.util.RetryCounterFactory;
270import org.apache.hadoop.hbase.util.TableDescriptorChecker;
271import org.apache.hadoop.hbase.util.Threads;
272import org.apache.hadoop.hbase.util.VersionInfo;
273import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
274import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
275import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
276import org.apache.hadoop.hbase.zookeeper.ZKUtil;
277import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
278import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
279import org.apache.yetus.audience.InterfaceAudience;
280import org.apache.zookeeper.KeeperException;
281import org.slf4j.Logger;
282import org.slf4j.LoggerFactory;
283
284import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
285import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
286import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
287import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
288import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
289import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
290import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
291import org.apache.hbase.thirdparty.com.google.protobuf.Service;
292import org.apache.hbase.thirdparty.org.eclipse.jetty.ee8.servlet.ServletHolder;
293import org.apache.hbase.thirdparty.org.eclipse.jetty.ee8.webapp.WebAppContext;
294import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
295import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
296import org.apache.hbase.thirdparty.org.glassfish.jersey.server.ResourceConfig;
297import org.apache.hbase.thirdparty.org.glassfish.jersey.servlet.ServletContainer;
298
299import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
302
303/**
304 * HMaster is the "master server" for HBase. An HBase cluster has one active master. If many masters
305 * are started, all compete. Whichever wins goes on to run the cluster. All others park themselves
306 * in their constructor until master or cluster shutdown or until the active master loses its lease
307 * in zookeeper. Thereafter, all running master jostle to take over master role.
308 * <p/>
309 * The Master can be asked shutdown the cluster. See {@link #shutdown()}. In this case it will tell
310 * all regionservers to go down and then wait on them all reporting in that they are down. This
311 * master will then shut itself down.
312 * <p/>
313 * You can also shutdown just this master. Call {@link #stopMaster()}.
314 * @see org.apache.zookeeper.Watcher
315 */
316@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
317public class HMaster extends HBaseServerBase<MasterRpcServices> implements MasterServices {
318
319  private static final Logger LOG = LoggerFactory.getLogger(HMaster.class);
320
321  // MASTER is name of the webapp and the attribute name used stuffing this
322  // instance into a web context !! AND OTHER PLACES !!
323  public static final String MASTER = "master";
324
325  // Manager and zk listener for master election
326  private final ActiveMasterManager activeMasterManager;
327  // Region server tracker
328  private final RegionServerTracker regionServerTracker;
329  // Draining region server tracker
330  private DrainingServerTracker drainingServerTracker;
331  // Tracker for load balancer state
332  LoadBalancerStateStore loadBalancerStateStore;
333  // Tracker for meta location, if any client ZK quorum specified
334  private MetaLocationSyncer metaLocationSyncer;
335  // Tracker for active master location, if any client ZK quorum specified
336  @InterfaceAudience.Private
337  MasterAddressSyncer masterAddressSyncer;
338  // Tracker for auto snapshot cleanup state
339  SnapshotCleanupStateStore snapshotCleanupStateStore;
340
341  // Tracker for split and merge state
342  private SplitOrMergeStateStore splitOrMergeStateStore;
343
344  private ClusterSchemaService clusterSchemaService;
345
346  public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
347    "hbase.master.wait.on.service.seconds";
348  public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
349
350  public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
351
352  public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
353
354  private String clusterId;
355
356  // Metrics for the HMaster
357  final MetricsMaster metricsMaster;
358  // file system manager for the master FS operations
359  private MasterFileSystem fileSystemManager;
360  private MasterWalManager walManager;
361
362  // manager to manage procedure-based WAL splitting, can be null if current
363  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
364  // and MasterWalManager, which means zk-based WAL splitting code will be
365  // useless after we switch to the procedure-based one. our eventual goal
366  // is to remove all the zk-based WAL splitting code.
367  private SplitWALManager splitWALManager;
368
369  // server manager to deal with region server info
370  private volatile ServerManager serverManager;
371
372  // manager of assignment nodes in zookeeper
373  private AssignmentManager assignmentManager;
374
375  private RSGroupInfoManager rsGroupInfoManager;
376
377  private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
378    new ReplicationLogCleanerBarrier();
379
380  // Only allow to add one sync replication peer concurrently
381  private final Semaphore syncReplicationPeerLock = new Semaphore(1);
382
383  // manager of replication
384  private ReplicationPeerManager replicationPeerManager;
385
386  private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
387
388  // buffer for "fatal error" notices from region servers
389  // in the cluster. This is only used for assisting
390  // operations/debugging.
391  MemoryBoundedLogMessageBuffer rsFatals;
392
393  // flag set after we become the active master (used for testing)
394  private volatile boolean activeMaster = false;
395
396  // flag set after we complete initialization once active
397  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
398
399  // flag set after master services are started,
400  // initialization may have not completed yet.
401  volatile boolean serviceStarted = false;
402
403  // Maximum time we should run balancer for
404  private final int maxBalancingTime;
405  // Maximum percent of regions in transition when balancing
406  private final double maxRitPercent;
407
408  private final LockManager lockManager = new LockManager(this);
409
410  private RSGroupBasedLoadBalancer balancer;
411  private BalancerChore balancerChore;
412  private static boolean disableBalancerChoreForTest = false;
413  private RegionNormalizerManager regionNormalizerManager;
414  private ClusterStatusChore clusterStatusChore;
415  private ClusterStatusPublisher clusterStatusPublisherChore = null;
416  private SnapshotCleanerChore snapshotCleanerChore = null;
417
418  private HbckChore hbckChore;
419  CatalogJanitor catalogJanitorChore;
420  // Threadpool for scanning the Old logs directory, used by the LogCleaner
421  private DirScanPool logCleanerPool;
422  private LogCleaner logCleaner;
423  // HFile cleaners for the custom hfile archive paths and the default archive path
424  // The archive path cleaner is the first element
425  private List<HFileCleaner> hfileCleaners = new ArrayList<>();
426  // The hfile cleaner paths, including custom paths and the default archive path
427  private List<Path> hfileCleanerPaths = new ArrayList<>();
428  // The shared hfile cleaner pool for the custom archive paths
429  private DirScanPool sharedHFileCleanerPool;
430  // The exclusive hfile cleaner pool for scanning the archive directory
431  private DirScanPool exclusiveHFileCleanerPool;
432  private ReplicationBarrierCleaner replicationBarrierCleaner;
433  private MobFileCleanerChore mobFileCleanerChore;
434  private MobFileCompactionChore mobFileCompactionChore;
435  private RollingUpgradeChore rollingUpgradeChore;
436  // used to synchronize the mobCompactionStates
437  private final IdLock mobCompactionLock = new IdLock();
438  // save the information of mob compactions in tables.
439  // the key is table name, the value is the number of compactions in that table.
440  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
441
442  volatile MasterCoprocessorHost cpHost;
443
444  private final boolean preLoadTableDescriptors;
445
446  // Time stamps for when a hmaster became active
447  private long masterActiveTime;
448
449  // Time stamp for when HMaster finishes becoming Active Master
450  private long masterFinishedInitializationTime;
451
452  Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
453
454  // monitor for snapshot of hbase tables
455  SnapshotManager snapshotManager;
456  // monitor for distributed procedures
457  private MasterProcedureManagerHost mpmHost;
458
459  private RegionsRecoveryChore regionsRecoveryChore = null;
460
461  private RegionsRecoveryConfigManager regionsRecoveryConfigManager = null;
462  // it is assigned after 'initialized' guard set to true, so should be volatile
463  private volatile MasterQuotaManager quotaManager;
464  private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
465  private QuotaObserverChore quotaObserverChore;
466  private SnapshotQuotaObserverChore snapshotQuotaChore;
467  private OldWALsDirSizeChore oldWALsDirSizeChore;
468
469  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
470  private ProcedureStore procedureStore;
471
472  // the master local storage to store procedure data, meta region locations, etc.
473  private MasterRegion masterRegion;
474
475  private RegionServerList rsListStorage;
476
477  // handle table states
478  private TableStateManager tableStateManager;
479
480  /** jetty server for master to redirect requests to regionserver infoServer */
481  private Server masterJettyServer;
482
483  // Determine if we should do normal startup or minimal "single-user" mode with no region
484  // servers and no user tables. Useful for repair and recovery of hbase:meta
485  private final boolean maintenanceMode;
486  static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
487
488  // the in process region server for carry system regions in maintenanceMode
489  private JVMClusterUtil.RegionServerThread maintenanceRegionServer;
490
491  // Cached clusterId on stand by masters to serve clusterID requests from clients.
492  private final CachedClusterId cachedClusterId;
493
494  public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move";
495  private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true;
496
497  /**
498   * Use RSProcedureDispatcher instance to initiate master -> rs remote procedure execution. Use
499   * this config to extend RSProcedureDispatcher (mainly for testing purpose).
500   */
501  public static final String HBASE_MASTER_RSPROC_DISPATCHER_CLASS =
502    "hbase.master.rsproc.dispatcher.class";
503  private static final String DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS =
504    RSProcedureDispatcher.class.getName();
505
506  private TaskGroup startupTaskGroup;
507
508  /**
509   * Store whether we allow replication peer modification operations.
510   */
511  private ReplicationPeerModificationStateStore replicationPeerModificationStateStore;
512
513  /**
514   * Initializes the HMaster. The steps are as follows:
515   * <p>
516   * <ol>
517   * <li>Initialize the local HRegionServer
518   * <li>Start the ActiveMasterManager.
519   * </ol>
520   * <p>
521   * Remaining steps of initialization occur in {@link #finishActiveMasterInitialization()} after
522   * the master becomes the active one.
523   */
524  public HMaster(final Configuration conf) throws IOException {
525    super(conf, "Master");
526    final Span span = TraceUtil.createSpan("HMaster.cxtor");
527    try (Scope ignored = span.makeCurrent()) {
528      if (conf.getBoolean(MAINTENANCE_MODE, false)) {
529        LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
530        maintenanceMode = true;
531      } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
532        LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
533        maintenanceMode = true;
534      } else {
535        maintenanceMode = false;
536      }
537      this.rsFatals = new MemoryBoundedLogMessageBuffer(
538        conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
539      LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}",
540        CommonFSUtils.getRootDir(this.conf),
541        this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
542
543      // Disable usage of meta replicas in the master
544      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
545
546      decorateMasterConfiguration(this.conf);
547
548      // Hack! Maps DFSClient => Master for logs. HDFS made this
549      // config param for task trackers, but we can piggyback off of it.
550      if (this.conf.get("mapreduce.task.attempt.id") == null) {
551        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
552      }
553
554      this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
555
556      // preload table descriptor at startup
557      this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
558
559      this.maxBalancingTime = getMaxBalancingTime();
560      this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
561        HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
562
563      // Do we publish the status?
564      boolean shouldPublish =
565        conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
566      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
567        conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
568          ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
569          ClusterStatusPublisher.Publisher.class);
570
571      if (shouldPublish) {
572        if (publisherClass == null) {
573          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but "
574            + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS
575            + " is not set - not publishing status");
576        } else {
577          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
578          LOG.debug("Created {}", this.clusterStatusPublisherChore);
579          getChoreService().scheduleChore(clusterStatusPublisherChore);
580        }
581      }
582      this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
583      cachedClusterId = new CachedClusterId(this, conf);
584      this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
585      this.rpcServices.start(zooKeeper);
586      span.setStatus(StatusCode.OK);
587    } catch (Throwable t) {
588      // Make sure we log the exception. HMaster is often started via reflection and the
589      // cause of failed startup is lost.
590      TraceUtil.setError(span, t);
591      LOG.error("Failed construction of Master", t);
592      throw t;
593    } finally {
594      span.end();
595    }
596  }
597
598  /**
599   * Protected to have custom implementations in tests override the default ActiveMaster
600   * implementation.
601   */
602  protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
603    org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
604    return new ActiveMasterManager(zk, sn, server);
605  }
606
607  @Override
608  protected String getUseThisHostnameInstead(Configuration conf) {
609    return conf.get(MASTER_HOSTNAME_KEY);
610  }
611
612  @Override
613  protected DNS.ServerType getDNSServerType() {
614    return DNS.ServerType.MASTER;
615  }
616
617  private void registerConfigurationObservers() {
618    configurationManager.registerObserver(this.rpcServices);
619    configurationManager.registerObserver(this);
620  }
621
622  // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
623  // block in here until then.
624  @Override
625  public void run() {
626    try {
627      installShutdownHook();
628      registerConfigurationObservers();
629      Threads.setDaemonThreadRunning(new Thread(TraceUtil.tracedRunnable(() -> {
630        try {
631          int infoPort = putUpJettyServer();
632          startActiveMasterManager(infoPort);
633        } catch (Throwable t) {
634          // Make sure we log the exception.
635          String error = "Failed to become Active Master";
636          LOG.error(error, t);
637          // Abort should have been called already.
638          if (!isAborted()) {
639            abort(error, t);
640          }
641        }
642      }, "HMaster.becomeActiveMaster")), getName() + ":becomeActiveMaster");
643      while (!isStopped() && !isAborted()) {
644        sleeper.sleep();
645      }
646      final Span span = TraceUtil.createSpan("HMaster exiting main loop");
647      try (Scope ignored = span.makeCurrent()) {
648        stopInfoServer();
649        closeClusterConnection();
650        stopServiceThreads();
651        if (this.rpcServices != null) {
652          this.rpcServices.stop();
653        }
654        closeZooKeeper();
655        closeTableDescriptors();
656        span.setStatus(StatusCode.OK);
657      } finally {
658        span.end();
659      }
660    } finally {
661      if (this.clusterSchemaService != null) {
662        // If on way out, then we are no longer active master.
663        this.clusterSchemaService.stopAsync();
664        try {
665          this.clusterSchemaService
666            .awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
667              DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
668        } catch (TimeoutException te) {
669          LOG.warn("Failed shutdown of clusterSchemaService", te);
670        }
671      }
672      this.activeMaster = false;
673    }
674  }
675
676  // return the actual infoPort, -1 means disable info server.
677  private int putUpJettyServer() throws IOException {
678    if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
679      return -1;
680    }
681    final int infoPort =
682      conf.getInt("hbase.master.info.port.orig", HConstants.DEFAULT_MASTER_INFOPORT);
683    // -1 is for disabling info server, so no redirecting
684    if (infoPort < 0 || infoServer == null) {
685      return -1;
686    }
687    if (infoPort == infoServer.getPort()) {
688      // server is already running
689      return infoPort;
690    }
691    final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
692    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
693      String msg = "Failed to start redirecting jetty server. Address " + addr
694        + " does not belong to this host. Correct configuration parameter: "
695        + "hbase.master.info.bindAddress";
696      LOG.error(msg);
697      throw new IOException(msg);
698    }
699
700    // TODO I'm pretty sure we could just add another binding to the InfoServer run by
701    // the RegionServer and have it run the RedirectServlet instead of standing up
702    // a second entire stack here.
703    masterJettyServer = new Server();
704    final ServerConnector connector = new ServerConnector(masterJettyServer);
705    connector.setHost(addr);
706    connector.setPort(infoPort);
707    masterJettyServer.addConnector(connector);
708    masterJettyServer.setStopAtShutdown(true);
709    masterJettyServer.setHandler(HttpServer.buildGzipHandler(masterJettyServer.getHandler()));
710
711    final String redirectHostname =
712      StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
713
714    final MasterRedirectServlet redirect = new MasterRedirectServlet(infoServer, redirectHostname);
715    final WebAppContext context =
716      new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
717    context.addServlet(new ServletHolder(redirect), "/*");
718    context.setServer(masterJettyServer);
719
720    try {
721      masterJettyServer.start();
722    } catch (Exception e) {
723      throw new IOException("Failed to start redirecting jetty server", e);
724    }
725    return connector.getLocalPort();
726  }
727
728  /**
729   * For compatibility, if failed with regionserver credentials, try the master one
730   */
731  @Override
732  protected void login(UserProvider user, String host) throws IOException {
733    try {
734      user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
735        SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
736    } catch (IOException ie) {
737      user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE, SecurityConstants.MASTER_KRB_PRINCIPAL,
738        host);
739    }
740  }
741
742  public MasterRpcServices getMasterRpcServices() {
743    return rpcServices;
744  }
745
746  @Override
747  protected MasterCoprocessorHost getCoprocessorHost() {
748    return getMasterCoprocessorHost();
749  }
750
751  public boolean balanceSwitch(final boolean b) throws IOException {
752    return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
753  }
754
755  @Override
756  protected String getProcessName() {
757    return MASTER;
758  }
759
760  @Override
761  protected boolean canCreateBaseZNode() {
762    return true;
763  }
764
765  @Override
766  protected boolean canUpdateTableDescriptor() {
767    return true;
768  }
769
770  @Override
771  protected boolean cacheTableDescriptor() {
772    return true;
773  }
774
775  protected MasterRpcServices createRpcServices() throws IOException {
776    return new MasterRpcServices(this);
777  }
778
779  @Override
780  protected void configureInfoServer(InfoServer infoServer) {
781    infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class);
782    infoServer.addUnprivilegedServlet("api_v1", "/api/v1/*", buildApiV1Servlet());
783    infoServer.addUnprivilegedServlet("hbck", "/hbck/*", buildHbckServlet());
784
785    infoServer.setAttribute(MASTER, this);
786  }
787
788  private ServletHolder buildApiV1Servlet() {
789    final ResourceConfig config = ResourceConfigFactory.createResourceConfig(conf, this);
790    return new ServletHolder(new ServletContainer(config));
791  }
792
793  private ServletHolder buildHbckServlet() {
794    final ResourceConfig config = HbckConfigFactory.createResourceConfig(conf, this);
795    return new ServletHolder(new ServletContainer(config));
796  }
797
798  @Override
799  protected Class<? extends HttpServlet> getDumpServlet() {
800    return MasterDumpServlet.class;
801  }
802
803  @Override
804  public MetricsMaster getMasterMetrics() {
805    return metricsMaster;
806  }
807
808  /**
809   * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
810   * should have already been initialized along with {@link ServerManager}.
811   */
812  private void initializeZKBasedSystemTrackers()
813    throws IOException, KeeperException, ReplicationException, DeserializationException {
814    if (maintenanceMode) {
815      // in maintenance mode, always use MaintenanceLoadBalancer.
816      conf.unset(LoadBalancer.HBASE_RSGROUP_LOADBALANCER_CLASS);
817      conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MaintenanceLoadBalancer.class,
818        LoadBalancer.class);
819    }
820    this.balancer = new RSGroupBasedLoadBalancer();
821    this.loadBalancerStateStore = new LoadBalancerStateStore(masterRegion, zooKeeper);
822
823    this.regionNormalizerManager =
824      RegionNormalizerFactory.createNormalizerManager(conf, masterRegion, zooKeeper, this);
825    this.configurationManager.registerObserver(regionNormalizerManager);
826    this.regionNormalizerManager.start();
827
828    this.splitOrMergeStateStore = new SplitOrMergeStateStore(masterRegion, zooKeeper, conf);
829
830    // This is for backwards compatible. We do not need the CP for rs group now but if user want to
831    // load it, we need to enable rs group.
832    String[] cpClasses = conf.getStrings(MasterCoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
833    if (cpClasses != null) {
834      for (String cpClass : cpClasses) {
835        if (RSGroupAdminEndpoint.class.getName().equals(cpClass)) {
836          RSGroupUtil.enableRSGroup(conf);
837          break;
838        }
839      }
840    }
841    this.rsGroupInfoManager = RSGroupInfoManager.create(this);
842
843    this.replicationPeerManager = ReplicationPeerManager.create(this, clusterId);
844    this.configurationManager.registerObserver(replicationPeerManager);
845    this.replicationPeerModificationStateStore =
846      new ReplicationPeerModificationStateStore(masterRegion);
847
848    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
849    this.drainingServerTracker.start();
850
851    this.snapshotCleanupStateStore = new SnapshotCleanupStateStore(masterRegion, zooKeeper);
852
853    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
854    boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
855      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
856    if (clientQuorumServers != null && !clientZkObserverMode) {
857      // we need to take care of the ZK information synchronization
858      // if given client ZK are not observer nodes
859      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
860        getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
861        false, true);
862      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
863      this.metaLocationSyncer.start();
864      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
865      this.masterAddressSyncer.start();
866      // set cluster id is a one-go effort
867      ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
868    }
869
870    // Set the cluster as up. If new RSs, they'll be waiting on this before
871    // going ahead with their startup.
872    boolean wasUp = this.clusterStatusTracker.isClusterUp();
873    if (!wasUp) this.clusterStatusTracker.setClusterUp();
874
875    LOG.info("Active/primary master=" + this.serverName + ", sessionid=0x"
876      + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())
877      + ", setting cluster-up flag (Was=" + wasUp + ")");
878
879    // create/initialize the snapshot manager and other procedure managers
880    this.snapshotManager = new SnapshotManager();
881    this.mpmHost = new MasterProcedureManagerHost();
882    this.mpmHost.register(this.snapshotManager);
883    this.mpmHost.register(new MasterFlushTableProcedureManager());
884    this.mpmHost.loadProcedures(conf);
885    this.mpmHost.initialize(this, this.metricsMaster);
886  }
887
888  // Will be overriden in test to inject customized AssignmentManager
889  @InterfaceAudience.Private
890  protected AssignmentManager createAssignmentManager(MasterServices master,
891    MasterRegion masterRegion) {
892    return new AssignmentManager(master, masterRegion);
893  }
894
895  private void tryMigrateMetaLocationsFromZooKeeper() throws IOException, KeeperException {
896    // try migrate data from zookeeper
897    try (ResultScanner scanner =
898      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
899      if (scanner.next() != null) {
900        // notice that all replicas for a region are in the same row, so the migration can be
901        // done with in a one row put, which means if we have data in catalog family then we can
902        // make sure that the migration is done.
903        LOG.info("The {} family in master local region already has data in it, skip migrating...",
904          HConstants.CATALOG_FAMILY_STR);
905        return;
906      }
907    }
908    // start migrating
909    byte[] row = CatalogFamilyFormat.getMetaKeyForRegion(RegionInfoBuilder.FIRST_META_REGIONINFO);
910    Put put = new Put(row);
911    List<String> metaReplicaNodes = zooKeeper.getMetaReplicaNodes();
912    StringBuilder info = new StringBuilder("Migrating meta locations:");
913    for (String metaReplicaNode : metaReplicaNodes) {
914      int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZNode(metaReplicaNode);
915      RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId);
916      info.append(" ").append(state);
917      put.setTimestamp(state.getStamp());
918      MetaTableAccessor.addRegionInfo(put, state.getRegion());
919      if (state.getServerName() != null) {
920        MetaTableAccessor.addLocation(put, state.getServerName(), HConstants.NO_SEQNUM, replicaId);
921      }
922      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
923        .setFamily(HConstants.CATALOG_FAMILY)
924        .setQualifier(RegionStateStore.getStateColumn(replicaId)).setTimestamp(put.getTimestamp())
925        .setType(Cell.Type.Put).setValue(Bytes.toBytes(state.getState().name())).build());
926    }
927    if (!put.isEmpty()) {
928      LOG.info(info.toString());
929      masterRegion.update(r -> r.put(put));
930    } else {
931      LOG.info("No meta location available on zookeeper, skip migrating...");
932    }
933  }
934
935  /**
936   * Finish initialization of HMaster after becoming the primary master.
937   * <p/>
938   * The startup order is a bit complicated but very important, do not change it unless you know
939   * what you are doing.
940   * <ol>
941   * <li>Initialize file system based components - file system manager, wal manager, table
942   * descriptors, etc</li>
943   * <li>Publish cluster id</li>
944   * <li>Here comes the most complicated part - initialize server manager, assignment manager and
945   * region server tracker
946   * <ol type='i'>
947   * <li>Create server manager</li>
948   * <li>Create master local region</li>
949   * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
950   * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
951   * server</li>
952   * <li>Create assignment manager and start it, load the meta region state, but do not load data
953   * from meta region</li>
954   * <li>Start region server tracker, construct the online servers set and find out dead servers and
955   * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
956   * scan the wal directory and load from master local region to find out possible live region
957   * servers, and the differences between these two sets are the dead servers</li>
958   * </ol>
959   * </li>
960   * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
961   * <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
962   * the procedure executor, etc. Notice that the balancer must be created first as assignment
963   * manager may use it when assigning regions.</li>
964   * <li>Wait for meta to be initialized if necessary, start table state manager.</li>
965   * <li>Wait for enough region servers to check-in</li>
966   * <li>Let assignment manager load data from meta and construct region states</li>
967   * <li>Start all other things such as chore services, etc</li>
968   * </ol>
969   * <p/>
970   * Notice that now we will not schedule a special procedure to make meta online(unless the first
971   * time where meta has not been created yet), we will rely on SCP to bring meta online.
972   */
973  private void finishActiveMasterInitialization() throws IOException, InterruptedException,
974    KeeperException, ReplicationException, DeserializationException {
975    /*
976     * We are active master now... go initialize components we need to run.
977     */
978    startupTaskGroup.addTask("Initializing Master file system");
979
980    this.masterActiveTime = EnvironmentEdgeManager.currentTime();
981    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
982
983    // always initialize the MemStoreLAB as we use a region to store data in master now, see
984    // localStore.
985    initializeMemStoreChunkCreator(null);
986    this.fileSystemManager = new MasterFileSystem(conf);
987    this.walManager = new MasterWalManager(this);
988
989    // warm-up HTDs cache on master initialization
990    if (preLoadTableDescriptors) {
991      startupTaskGroup.addTask("Pre-loading table descriptors");
992      this.tableDescriptors.getAll();
993    }
994
995    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
996    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
997    // before it has called its run method and before RegionServer has done the reportForDuty.
998    ClusterId clusterId = fileSystemManager.getClusterId();
999    startupTaskGroup.addTask("Publishing Cluster ID " + clusterId + " in ZooKeeper");
1000    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
1001    this.clusterId = clusterId.toString();
1002
1003    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
1004    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
1005    // hbase.write.hbck1.lock.file to false.
1006    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
1007      Pair<Path, FSDataOutputStream> result = null;
1008      try {
1009        result = HBaseFsck.checkAndMarkRunningHbck(this.conf,
1010          HBaseFsck.createLockRetryCounterFactory(this.conf).create());
1011      } finally {
1012        if (result != null) {
1013          Closeables.close(result.getSecond(), true);
1014        }
1015      }
1016    }
1017
1018    startupTaskGroup.addTask("Initialize ServerManager and schedule SCP for crash servers");
1019    // The below two managers must be created before loading procedures, as they will be used during
1020    // loading.
1021    // initialize master local region
1022    masterRegion = MasterRegionFactory.create(this);
1023    rsListStorage = new MasterRegionServerList(masterRegion, this);
1024
1025    // Initialize the ServerManager and register it as a configuration observer
1026    this.serverManager = createServerManager(this, rsListStorage);
1027    this.configurationManager.registerObserver(this.serverManager);
1028
1029    this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
1030    if (
1031      !conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
1032    ) {
1033      this.splitWALManager = new SplitWALManager(this);
1034    }
1035
1036    tryMigrateMetaLocationsFromZooKeeper();
1037
1038    createProcedureExecutor();
1039    Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
1040      .getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));
1041
1042    // Create Assignment Manager
1043    this.assignmentManager = createAssignmentManager(this, masterRegion);
1044    this.assignmentManager.start();
1045    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
1046    // completed, it could still be in the procedure list. This is a bit strange but is another
1047    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
1048    List<TransitRegionStateProcedure> ritList =
1049      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
1050        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
1051        .collect(Collectors.toList());
1052    this.assignmentManager.setupRIT(ritList);
1053
1054    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
1055    // be registered in the deadServers set -- and the servernames loaded from the WAL directory
1056    // and master local region that COULD BE 'alive'(we'll schedule SCPs for each and let SCP figure
1057    // it out).
1058    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
1059    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
1060    this.regionServerTracker.upgrade(
1061      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
1062        .map(p -> (ServerCrashProcedure) p).collect(
1063          Collectors.toMap(ServerCrashProcedure::getServerName, Procedure::getSubmittedTime)),
1064      Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
1065      walManager.getSplittingServersFromWALDir());
1066    // This manager must be accessed AFTER hbase:meta is confirmed on line..
1067    this.tableStateManager = new TableStateManager(this);
1068
1069    startupTaskGroup.addTask("Initializing ZK system trackers");
1070    initializeZKBasedSystemTrackers();
1071    startupTaskGroup.addTask("Loading last flushed sequence id of regions");
1072    try {
1073      this.serverManager.loadLastFlushedSequenceIds();
1074    } catch (IOException e) {
1075      LOG.info("Failed to load last flushed sequence id of regions" + " from file system", e);
1076    }
1077    // Set ourselves as active Master now our claim has succeeded up in zk.
1078    this.activeMaster = true;
1079
1080    // Start the Zombie master detector after setting master as active, see HBASE-21535
1081    Thread zombieDetector = new Thread(new MasterInitializationMonitor(this),
1082      "ActiveMasterInitializationMonitor-" + EnvironmentEdgeManager.currentTime());
1083    zombieDetector.setDaemon(true);
1084    zombieDetector.start();
1085
1086    if (!maintenanceMode) {
1087      startupTaskGroup.addTask("Initializing master coprocessors");
1088      setQuotasObserver(conf);
1089      this.cpHost = new MasterCoprocessorHost(this, conf);
1090    } else {
1091      // start an in process region server for carrying system regions
1092      maintenanceRegionServer =
1093        JVMClusterUtil.createRegionServerThread(getConfiguration(), HRegionServer.class, 0);
1094      maintenanceRegionServer.start();
1095    }
1096
1097    // Checking if meta needs initializing.
1098    startupTaskGroup.addTask("Initializing meta table if this is a new deploy");
1099    InitMetaProcedure initMetaProc = null;
1100    // Print out state of hbase:meta on startup; helps debugging.
1101    if (!this.assignmentManager.getRegionStates().hasTableRegionStates(TableName.META_TABLE_NAME)) {
1102      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
1103        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
1104      initMetaProc = optProc.orElseGet(() -> {
1105        // schedule an init meta procedure if meta has not been deployed yet
1106        InitMetaProcedure temp = new InitMetaProcedure();
1107        procedureExecutor.submitProcedure(temp);
1108        return temp;
1109      });
1110    }
1111
1112    // initialize load balancer
1113    this.balancer.setMasterServices(this);
1114    this.balancer.initialize();
1115    this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
1116
1117    // try migrate replication data
1118    ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
1119      new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
1120    // check whether there are something to migrate and we haven't scheduled a migration procedure
1121    // yet
1122    if (
1123      oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
1124        .allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
1125    ) {
1126      procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
1127    }
1128    // start up all service threads.
1129    startupTaskGroup.addTask("Initializing master service threads");
1130    startServiceThreads();
1131    // wait meta to be initialized after we start procedure executor
1132    if (initMetaProc != null) {
1133      initMetaProc.await();
1134      if (initMetaProc.isFailed() && initMetaProc.hasException()) {
1135        throw new IOException("Failed to initialize meta table", initMetaProc.getException());
1136      }
1137    }
1138    // Wake up this server to check in
1139    sleeper.skipSleepCycle();
1140
1141    // Wait for region servers to report in.
1142    // With this as part of master initialization, it precludes our being able to start a single
1143    // server that is both Master and RegionServer. Needs more thought. TODO.
1144    String statusStr = "Wait for region servers to report in";
1145    MonitoredTask waitRegionServer = startupTaskGroup.addTask(statusStr);
1146    LOG.info(Objects.toString(waitRegionServer));
1147    waitForRegionServers(waitRegionServer);
1148
1149    // Check if master is shutting down because issue initializing regionservers or balancer.
1150    if (isStopped()) {
1151      return;
1152    }
1153
1154    startupTaskGroup.addTask("Starting assignment manager");
1155    // FIRST HBASE:META READ!!!!
1156    // The below cannot make progress w/o hbase:meta being online.
1157    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
1158    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
1159    // if it is down. It may take a while to come online. So, wait here until meta if for sure
1160    // available. That's what waitForMetaOnline does.
1161    if (!waitForMetaOnline()) {
1162      return;
1163    }
1164
1165    TableDescriptor metaDescriptor = tableDescriptors.get(TableName.META_TABLE_NAME);
1166    final ColumnFamilyDescriptor tableFamilyDesc =
1167      metaDescriptor.getColumnFamily(HConstants.TABLE_FAMILY);
1168    final ColumnFamilyDescriptor replBarrierFamilyDesc =
1169      metaDescriptor.getColumnFamily(HConstants.REPLICATION_BARRIER_FAMILY);
1170
1171    this.assignmentManager.initializationPostMetaOnline();
1172    this.assignmentManager.joinCluster();
1173    // The below depends on hbase:meta being online.
1174    this.assignmentManager.processOfflineRegions();
1175    // this must be called after the above processOfflineRegions to prevent race
1176    this.assignmentManager.wakeMetaLoadedEvent();
1177
1178    // for migrating from a version without HBASE-25099, and also for honoring the configuration
1179    // first.
1180    if (conf.get(HConstants.META_REPLICAS_NUM) != null) {
1181      int replicasNumInConf =
1182        conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM);
1183      TableDescriptor metaDesc = tableDescriptors.get(TableName.META_TABLE_NAME);
1184      if (metaDesc.getRegionReplication() != replicasNumInConf) {
1185        // it is possible that we already have some replicas before upgrading, so we must set the
1186        // region replication number in meta TableDescriptor directly first, without creating a
1187        // ModifyTableProcedure, otherwise it may cause a double assign for the meta replicas.
1188        int existingReplicasCount =
1189          assignmentManager.getRegionStates().getRegionsOfTable(TableName.META_TABLE_NAME).size();
1190        if (existingReplicasCount > metaDesc.getRegionReplication()) {
1191          LOG.info("Update replica count of hbase:meta from {}(in TableDescriptor)"
1192            + " to {}(existing ZNodes)", metaDesc.getRegionReplication(), existingReplicasCount);
1193          metaDesc = TableDescriptorBuilder.newBuilder(metaDesc)
1194            .setRegionReplication(existingReplicasCount).build();
1195          tableDescriptors.update(metaDesc);
1196        }
1197        // check again, and issue a ModifyTableProcedure if needed
1198        if (metaDesc.getRegionReplication() != replicasNumInConf) {
1199          LOG.info(
1200            "The {} config is {} while the replica count in TableDescriptor is {}"
1201              + " for hbase:meta, altering...",
1202            HConstants.META_REPLICAS_NUM, replicasNumInConf, metaDesc.getRegionReplication());
1203          procedureExecutor.submitProcedure(new ModifyTableProcedure(
1204            procedureExecutor.getEnvironment(), TableDescriptorBuilder.newBuilder(metaDesc)
1205              .setRegionReplication(replicasNumInConf).build(),
1206            null, metaDesc, false, true));
1207        }
1208      }
1209    }
1210    // Initialize after meta is up as below scans meta
1211    FavoredNodesManager fnm = getFavoredNodesManager();
1212    if (fnm != null) {
1213      fnm.initializeFromMeta();
1214    }
1215
1216    // set cluster status again after user regions are assigned
1217    this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
1218
1219    // Start balancer and meta catalog janitor after meta and regions have been assigned.
1220    startupTaskGroup.addTask("Starting balancer and catalog janitor");
1221    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
1222    getChoreService().scheduleChore(clusterStatusChore);
1223    this.balancerChore = new BalancerChore(this);
1224    if (!disableBalancerChoreForTest) {
1225      getChoreService().scheduleChore(balancerChore);
1226    }
1227    if (regionNormalizerManager != null) {
1228      getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
1229    }
1230    this.catalogJanitorChore = new CatalogJanitor(this);
1231    getChoreService().scheduleChore(catalogJanitorChore);
1232    this.hbckChore = new HbckChore(this);
1233    getChoreService().scheduleChore(hbckChore);
1234    this.serverManager.startChore();
1235
1236    // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
1237    if (!waitForNamespaceOnline()) {
1238      return;
1239    }
1240    startupTaskGroup.addTask("Starting cluster schema service");
1241    try {
1242      initClusterSchemaService();
1243    } catch (IllegalStateException e) {
1244      if (
1245        e.getCause() != null && e.getCause() instanceof NoSuchColumnFamilyException
1246          && tableFamilyDesc == null && replBarrierFamilyDesc == null
1247      ) {
1248        LOG.info("ClusterSchema service could not be initialized. This is "
1249          + "expected during HBase 1 to 2 upgrade", e);
1250      } else {
1251        throw e;
1252      }
1253    }
1254
1255    if (this.cpHost != null) {
1256      try {
1257        this.cpHost.preMasterInitialization();
1258      } catch (IOException e) {
1259        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1260      }
1261    }
1262
1263    LOG.info(String.format("Master has completed initialization %.3fsec",
1264      (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
1265    this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
1266    configurationManager.registerObserver(this.balancer);
1267    configurationManager.registerObserver(this.logCleanerPool);
1268    configurationManager.registerObserver(this.logCleaner);
1269    configurationManager.registerObserver(this.regionsRecoveryConfigManager);
1270    configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
1271    if (this.sharedHFileCleanerPool != null) {
1272      configurationManager.registerObserver(this.sharedHFileCleanerPool);
1273    }
1274    if (this.hfileCleaners != null) {
1275      for (HFileCleaner cleaner : hfileCleaners) {
1276        configurationManager.registerObserver(cleaner);
1277      }
1278    }
1279    // Set master as 'initialized'.
1280    setInitialized(true);
1281    startupTaskGroup.markComplete("Initialization successful");
1282    MonitoredTask status =
1283      TaskMonitor.get().createStatus("Progress after master initialized", false, true);
1284
1285    if (tableFamilyDesc == null && replBarrierFamilyDesc == null) {
1286      // create missing CFs in meta table after master is set to 'initialized'.
1287      createMissingCFsInMetaDuringUpgrade(metaDescriptor);
1288
1289      // Throwing this Exception to abort active master is painful but this
1290      // seems the only way to add missing CFs in meta while upgrading from
1291      // HBase 1 to 2 (where HBase 2 has HBASE-23055 & HBASE-23782 checked-in).
1292      // So, why do we abort active master after adding missing CFs in meta?
1293      // When we reach here, we would have already bypassed NoSuchColumnFamilyException
1294      // in initClusterSchemaService(), meaning ClusterSchemaService is not
1295      // correctly initialized but we bypassed it. Similarly, we bypassed
1296      // tableStateManager.start() as well. Hence, we should better abort
1297      // current active master because our main task - adding missing CFs
1298      // in meta table is done (possible only after master state is set as
1299      // initialized) at the expense of bypassing few important tasks as part
1300      // of active master init routine. So now we abort active master so that
1301      // next active master init will not face any issues and all mandatory
1302      // services will be started during master init phase.
1303      throw new PleaseRestartMasterException("Aborting active master after missing"
1304        + " CFs are successfully added in meta. Subsequent active master "
1305        + "initialization should be uninterrupted");
1306    }
1307
1308    if (maintenanceMode) {
1309      LOG.info("Detected repair mode, skipping final initialization steps.");
1310      return;
1311    }
1312
1313    assignmentManager.checkIfShouldMoveSystemRegionAsync();
1314    status.setStatus("Starting quota manager");
1315    initQuotaManager();
1316    if (QuotaUtil.isQuotaEnabled(conf)) {
1317      // Create the quota snapshot notifier
1318      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
1319      spaceQuotaSnapshotNotifier.initialize(getConnection());
1320      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
1321      // Start the chore to read the region FS space reports and act on them
1322      getChoreService().scheduleChore(quotaObserverChore);
1323
1324      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
1325      // Start the chore to read snapshots and add their usage to table/NS quotas
1326      getChoreService().scheduleChore(snapshotQuotaChore);
1327    }
1328    final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
1329    slowLogMasterService.init();
1330
1331    WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
1332    // Create REPLICATION.SINK_TRACKER table if needed.
1333    ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
1334
1335    // clear the dead servers with same host name and port of online server because we are not
1336    // removing dead server with same hostname and port of rs which is trying to check in before
1337    // master initialization. See HBASE-5916.
1338    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1339
1340    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
1341    status.setStatus("Checking ZNode ACLs");
1342    zooKeeper.checkAndSetZNodeAcls();
1343
1344    status.setStatus("Initializing MOB Cleaner");
1345    initMobCleaner();
1346
1347    // delete the stale data for replication sync up tool if necessary
1348    status.setStatus("Cleanup ReplicationSyncUp status if necessary");
1349    Path replicationSyncUpInfoFile =
1350      new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
1351    if (dataFs.exists(replicationSyncUpInfoFile)) {
1352      // info file is available, load the timestamp and use it to clean up stale data in replication
1353      // queue storage.
1354      byte[] data;
1355      try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
1356        data = ByteStreams.toByteArray(in);
1357      }
1358      ReplicationSyncUpToolInfo info = null;
1359      try {
1360        info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
1361      } catch (JsonParseException e) {
1362        // usually this should be a partial file, which means the ReplicationSyncUp tool did not
1363        // finish properly, so not a problem. Here we do not clean up the status as we do not know
1364        // the reason why the tool did not finish properly, so let users clean the status up
1365        // manually
1366        LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
1367      }
1368      if (info != null) {
1369        LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
1370          info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
1371            .format(Instant.ofEpochMilli(info.getStartTimeMs())));
1372        replicationPeerManager.getQueueStorage()
1373          .removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
1374        // delete the file after removing the stale data, so next time we do not need to do this
1375        // again.
1376        dataFs.delete(replicationSyncUpInfoFile, false);
1377      }
1378    }
1379    status.setStatus("Calling postStartMaster coprocessors");
1380    if (this.cpHost != null) {
1381      // don't let cp initialization errors kill the master
1382      try {
1383        this.cpHost.postStartMaster();
1384      } catch (IOException ioe) {
1385        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1386      }
1387    }
1388
1389    zombieDetector.interrupt();
1390
1391    /*
1392     * After master has started up, lets do balancer post startup initialization. Since this runs in
1393     * activeMasterManager thread, it should be fine.
1394     */
1395    long start = EnvironmentEdgeManager.currentTime();
1396    this.balancer.postMasterStartupInitialize();
1397    if (LOG.isDebugEnabled()) {
1398      LOG.debug("Balancer post startup initialization complete, took "
1399        + ((EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
1400    }
1401
1402    this.rollingUpgradeChore = new RollingUpgradeChore(this);
1403    getChoreService().scheduleChore(rollingUpgradeChore);
1404
1405    this.oldWALsDirSizeChore = new OldWALsDirSizeChore(this);
1406    getChoreService().scheduleChore(this.oldWALsDirSizeChore);
1407
1408    status.markComplete("Progress after master initialized complete");
1409  }
1410
1411  /**
1412   * Used for testing only to set Mock objects.
1413   * @param hbckChore hbckChore
1414   */
1415  public void setHbckChoreForTesting(HbckChore hbckChore) {
1416    this.hbckChore = hbckChore;
1417  }
1418
1419  /**
1420   * Used for testing only to set Mock objects.
1421   * @param catalogJanitorChore catalogJanitorChore
1422   */
1423  public void setCatalogJanitorChoreForTesting(CatalogJanitor catalogJanitorChore) {
1424    this.catalogJanitorChore = catalogJanitorChore;
1425  }
1426
1427  private void createMissingCFsInMetaDuringUpgrade(TableDescriptor metaDescriptor)
1428    throws IOException {
1429    TableDescriptor newMetaDesc = TableDescriptorBuilder.newBuilder(metaDescriptor)
1430      .setColumnFamily(FSTableDescriptors.getTableFamilyDescForMeta(conf))
1431      .setColumnFamily(FSTableDescriptors.getReplBarrierFamilyDescForMeta()).build();
1432    long pid = this.modifyTable(TableName.META_TABLE_NAME, () -> newMetaDesc, 0, 0, false);
1433    waitForProcedureToComplete(pid, "Failed to add table and rep_barrier CFs to meta");
1434  }
1435
1436  private void waitForProcedureToComplete(long pid, String errorMessage) throws IOException {
1437    int tries = 30;
1438    while (
1439      !(getMasterProcedureExecutor().isFinished(pid)) && getMasterProcedureExecutor().isRunning()
1440        && tries > 0
1441    ) {
1442      try {
1443        Thread.sleep(1000);
1444      } catch (InterruptedException e) {
1445        throw new IOException("Wait interrupted", e);
1446      }
1447      tries--;
1448    }
1449    if (tries <= 0) {
1450      throw new HBaseIOException(
1451        "Failed to add table and rep_barrier CFs to meta in a given time.");
1452    } else {
1453      Procedure<?> result = getMasterProcedureExecutor().getResult(pid);
1454      if (result != null && result.isFailed()) {
1455        throw new IOException(
1456          errorMessage + ". " + MasterProcedureUtil.unwrapRemoteIOException(result));
1457      }
1458    }
1459  }
1460
1461  /**
1462   * Check hbase:meta is up and ready for reading. For use during Master startup only.
1463   * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online
1464   *         and we will hold here until operator intervention.
1465   */
1466  @InterfaceAudience.Private
1467  public boolean waitForMetaOnline() {
1468    return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
1469  }
1470
1471  /**
1472   * @return True if region is online and scannable else false if an error or shutdown (Otherwise we
1473   *         just block in here holding up all forward-progess).
1474   */
1475  private boolean isRegionOnline(RegionInfo ri) {
1476    RetryCounter rc = null;
1477    while (!isStopped()) {
1478      RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri);
1479      if (rs != null && rs.isOpened()) {
1480        if (this.getServerManager().isServerOnline(rs.getServerName())) {
1481          return true;
1482        }
1483      }
1484      // Region is not OPEN.
1485      Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures()
1486        .stream().filter(p -> p instanceof ServerCrashProcedure).findAny();
1487      // TODO: Add a page to refguide on how to do repair. Have this log message point to it.
1488      // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and
1489      // then how to assign including how to break region lock if one held.
1490      LOG.warn(
1491        "{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot "
1492          + "progress, in holding-pattern until region onlined.",
1493        ri.getRegionNameAsString(), rs, optProc.isPresent());
1494      // Check once-a-minute.
1495      if (rc == null) {
1496        rc = new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60_000).create();
1497      }
1498      Threads.sleep(rc.getBackoffTimeAndIncrementAttempts());
1499    }
1500    return false;
1501  }
1502
1503  /**
1504   * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table
1505   * <p/>
1506   * This is for rolling upgrading, later we will migrate the data in ns table to the ns family of
1507   * meta table. And if this is a new cluster, this method will return immediately as there will be
1508   * no namespace table/region.
1509   * @return True if namespace table is up/online.
1510   */
1511  private boolean waitForNamespaceOnline() throws IOException {
1512    TableState nsTableState =
1513      MetaTableAccessor.getTableState(getConnection(), TableName.NAMESPACE_TABLE_NAME);
1514    if (nsTableState == null || nsTableState.isDisabled()) {
1515      // this means we have already migrated the data and disabled or deleted the namespace table,
1516      // or this is a new deploy which does not have a namespace table from the beginning.
1517      return true;
1518    }
1519    List<RegionInfo> ris =
1520      this.assignmentManager.getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
1521    if (ris.isEmpty()) {
1522      // maybe this will not happen any more, but anyway, no harm to add a check here...
1523      return true;
1524    }
1525    // Else there are namespace regions up in meta. Ensure they are assigned before we go on.
1526    for (RegionInfo ri : ris) {
1527      if (!isRegionOnline(ri)) {
1528        return false;
1529      }
1530    }
1531    return true;
1532  }
1533
1534  /**
1535   * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to
1536   * automatically remove quotas for a table when that table is deleted.
1537   */
1538  @InterfaceAudience.Private
1539  public void updateConfigurationForQuotasObserver(Configuration conf) {
1540    // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
1541    if (
1542      !conf.getBoolean(MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
1543        MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)
1544    ) {
1545      return;
1546    }
1547    String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
1548    final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
1549    String[] updatedCoprocs = new String[length + 1];
1550    if (length > 0) {
1551      System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
1552    }
1553    updatedCoprocs[length] = MasterQuotasObserver.class.getName();
1554    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
1555  }
1556
1557  private void initMobCleaner() {
1558    this.mobFileCleanerChore = new MobFileCleanerChore(this);
1559    getChoreService().scheduleChore(mobFileCleanerChore);
1560    this.mobFileCompactionChore = new MobFileCompactionChore(this);
1561    getChoreService().scheduleChore(mobFileCompactionChore);
1562  }
1563
1564  /**
1565   * <p>
1566   * Create a {@link ServerManager} instance.
1567   * </p>
1568   * <p>
1569   * Will be overridden in tests.
1570   * </p>
1571   */
1572  @InterfaceAudience.Private
1573  protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
1574    throws IOException {
1575    // We put this out here in a method so can do a Mockito.spy and stub it out
1576    // w/ a mocked up ServerManager.
1577    setupClusterConnection();
1578    return new ServerManager(master, storage);
1579  }
1580
1581  private void waitForRegionServers(final MonitoredTask status)
1582    throws IOException, InterruptedException {
1583    this.serverManager.waitForRegionServers(status);
1584  }
1585
1586  // Will be overridden in tests
1587  @InterfaceAudience.Private
1588  protected void initClusterSchemaService() throws IOException, InterruptedException {
1589    this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
1590    this.clusterSchemaService.startAsync();
1591    try {
1592      this.clusterSchemaService
1593        .awaitRunning(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
1594          DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
1595    } catch (TimeoutException toe) {
1596      throw new IOException("Timedout starting ClusterSchemaService", toe);
1597    }
1598  }
1599
1600  private void initQuotaManager() throws IOException {
1601    MasterQuotaManager quotaManager = new MasterQuotaManager(this);
1602    quotaManager.start();
1603    this.quotaManager = quotaManager;
1604  }
1605
1606  private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1607    SpaceQuotaSnapshotNotifier notifier =
1608      SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1609    return notifier;
1610  }
1611
1612  public boolean isCatalogJanitorEnabled() {
1613    return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1614  }
1615
1616  boolean isCleanerChoreEnabled() {
1617    boolean hfileCleanerFlag = true, logCleanerFlag = true;
1618
1619    if (getHFileCleaner() != null) {
1620      hfileCleanerFlag = getHFileCleaner().getEnabled();
1621    }
1622
1623    if (logCleaner != null) {
1624      logCleanerFlag = logCleaner.getEnabled();
1625    }
1626
1627    return (hfileCleanerFlag && logCleanerFlag);
1628  }
1629
1630  @Override
1631  public ServerManager getServerManager() {
1632    return this.serverManager;
1633  }
1634
1635  @Override
1636  public MasterFileSystem getMasterFileSystem() {
1637    return this.fileSystemManager;
1638  }
1639
1640  @Override
1641  public MasterWalManager getMasterWalManager() {
1642    return this.walManager;
1643  }
1644
1645  @Override
1646  public boolean rotateSystemKeyIfChanged() throws IOException {
1647    // STUB - Feature not yet implemented
1648    return false;
1649  }
1650
1651  @Override
1652  public SplitWALManager getSplitWALManager() {
1653    return splitWALManager;
1654  }
1655
1656  @Override
1657  public TableStateManager getTableStateManager() {
1658    return tableStateManager;
1659  }
1660
1661  /*
1662   * Start up all services. If any of these threads gets an unhandled exception then they just die
1663   * with a logged message. This should be fine because in general, we do not expect the master to
1664   * get such unhandled exceptions as OOMEs; it should be lightly loaded. See what HRegionServer
1665   * does if need to install an unexpected exception handler.
1666   */
1667  private void startServiceThreads() throws IOException {
1668    // Start the executor service pools
1669    final int masterOpenRegionPoolSize = conf.getInt(HConstants.MASTER_OPEN_REGION_THREADS,
1670      HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
1671    executorService.startExecutorService(executorService.new ExecutorConfig()
1672      .setExecutorType(ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize));
1673    final int masterCloseRegionPoolSize = conf.getInt(HConstants.MASTER_CLOSE_REGION_THREADS,
1674      HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
1675    executorService.startExecutorService(
1676      executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_CLOSE_REGION)
1677        .setCorePoolSize(masterCloseRegionPoolSize));
1678    final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1679      HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
1680    executorService.startExecutorService(
1681      executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS)
1682        .setCorePoolSize(masterServerOpThreads));
1683    final int masterServerMetaOpsThreads =
1684      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1685        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
1686    executorService.startExecutorService(executorService.new ExecutorConfig()
1687      .setExecutorType(ExecutorType.MASTER_META_SERVER_OPERATIONS)
1688      .setCorePoolSize(masterServerMetaOpsThreads));
1689    final int masterLogReplayThreads = conf.getInt(HConstants.MASTER_LOG_REPLAY_OPS_THREADS,
1690      HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
1691    executorService.startExecutorService(executorService.new ExecutorConfig()
1692      .setExecutorType(ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads));
1693    final int masterSnapshotThreads = conf.getInt(SnapshotManager.SNAPSHOT_POOL_THREADS_KEY,
1694      SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
1695    executorService.startExecutorService(
1696      executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
1697        .setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
1698    final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
1699      HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
1700    executorService.startExecutorService(
1701      executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_OPERATIONS)
1702        .setCorePoolSize(masterMergeDispatchThreads).setAllowCoreThreadTimeout(true));
1703
1704    // We depend on there being only one instance of this executor running
1705    // at a time. To do concurrency, would need fencing of enable/disable of
1706    // tables.
1707    // Any time changing this maxThreads to > 1, pls see the comment at
1708    // AccessController#postCompletedCreateTableAction
1709    executorService.startExecutorService(executorService.new ExecutorConfig()
1710      .setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
1711    startProcedureExecutor();
1712
1713    // Create log cleaner thread pool
1714    logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
1715    Map<String, Object> params = new HashMap<>();
1716    params.put(MASTER, this);
1717    // Start log cleaner thread
1718    int cleanerInterval =
1719      conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
1720    this.logCleaner =
1721      new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(),
1722        getMasterWalManager().getOldLogDir(), logCleanerPool, params);
1723    getChoreService().scheduleChore(logCleaner);
1724
1725    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1726
1727    // Create custom archive hfile cleaners
1728    String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
1729    // todo: handle the overlap issues for the custom paths
1730
1731    if (paths != null && paths.length > 0) {
1732      if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
1733        Set<String> cleanerClasses = new HashSet<>();
1734        String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1735        if (cleaners != null) {
1736          Collections.addAll(cleanerClasses, cleaners);
1737        }
1738        conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
1739          cleanerClasses.toArray(new String[cleanerClasses.size()]));
1740        LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
1741          cleanerClasses);
1742      }
1743      // share the hfile cleaner pool in custom paths
1744      sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
1745      for (int i = 0; i < paths.length; i++) {
1746        Path path = new Path(paths[i].trim());
1747        HFileCleaner cleaner =
1748          new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this,
1749            conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
1750            HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null);
1751        hfileCleaners.add(cleaner);
1752        hfileCleanerPaths.add(path);
1753      }
1754    }
1755
1756    // Create the whole archive dir cleaner thread pool
1757    exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
1758    hfileCleaners.add(0,
1759      new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
1760        archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths));
1761    hfileCleanerPaths.add(0, archiveDir);
1762    // Schedule all the hfile cleaners
1763    for (HFileCleaner hFileCleaner : hfileCleaners) {
1764      getChoreService().scheduleChore(hFileCleaner);
1765    }
1766
1767    // Regions Reopen based on very high storeFileRefCount is considered enabled
1768    // only if hbase.regions.recovery.store.file.ref.count has value > 0
1769    final int maxStoreFileRefCount = conf.getInt(HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
1770      HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
1771    if (maxStoreFileRefCount > 0) {
1772      this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
1773      getChoreService().scheduleChore(this.regionsRecoveryChore);
1774    } else {
1775      LOG.info(
1776        "Reopening regions with very high storeFileRefCount is disabled. "
1777          + "Provide threshold value > 0 for {} to enable it.",
1778        HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
1779    }
1780
1781    this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this);
1782
1783    replicationBarrierCleaner =
1784      new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
1785    getChoreService().scheduleChore(replicationBarrierCleaner);
1786
1787    final boolean isSnapshotChoreEnabled = this.snapshotCleanupStateStore.get();
1788    this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
1789    if (isSnapshotChoreEnabled) {
1790      getChoreService().scheduleChore(this.snapshotCleanerChore);
1791    } else {
1792      if (LOG.isTraceEnabled()) {
1793        LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
1794      }
1795    }
1796    serviceStarted = true;
1797    if (LOG.isTraceEnabled()) {
1798      LOG.trace("Started service threads");
1799    }
1800  }
1801
1802  protected void stopServiceThreads() {
1803    if (masterJettyServer != null) {
1804      LOG.info("Stopping master jetty server");
1805      try {
1806        masterJettyServer.stop();
1807      } catch (Exception e) {
1808        LOG.error("Failed to stop master jetty server", e);
1809      }
1810    }
1811    stopChoreService();
1812    stopExecutorService();
1813    if (exclusiveHFileCleanerPool != null) {
1814      exclusiveHFileCleanerPool.shutdownNow();
1815      exclusiveHFileCleanerPool = null;
1816    }
1817    if (logCleanerPool != null) {
1818      logCleanerPool.shutdownNow();
1819      logCleanerPool = null;
1820    }
1821    if (sharedHFileCleanerPool != null) {
1822      sharedHFileCleanerPool.shutdownNow();
1823      sharedHFileCleanerPool = null;
1824    }
1825    if (maintenanceRegionServer != null) {
1826      maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
1827    }
1828
1829    LOG.debug("Stopping service threads");
1830    // stop procedure executor prior to other services such as server manager and assignment
1831    // manager, as these services are important for some running procedures. See HBASE-24117 for
1832    // example.
1833    stopProcedureExecutor();
1834
1835    if (regionNormalizerManager != null) {
1836      regionNormalizerManager.stop();
1837    }
1838    if (this.quotaManager != null) {
1839      this.quotaManager.stop();
1840    }
1841
1842    if (this.activeMasterManager != null) {
1843      this.activeMasterManager.stop();
1844    }
1845    if (this.serverManager != null) {
1846      this.serverManager.stop();
1847    }
1848    if (this.assignmentManager != null) {
1849      this.assignmentManager.stop();
1850    }
1851
1852    if (masterRegion != null) {
1853      masterRegion.close(isAborted());
1854    }
1855    if (this.walManager != null) {
1856      this.walManager.stop();
1857    }
1858    if (this.fileSystemManager != null) {
1859      this.fileSystemManager.stop();
1860    }
1861    if (this.mpmHost != null) {
1862      this.mpmHost.stop("server shutting down.");
1863    }
1864    if (this.regionServerTracker != null) {
1865      this.regionServerTracker.stop();
1866    }
1867  }
1868
1869  private void createProcedureExecutor() throws IOException {
1870    final String procedureDispatcherClassName =
1871      conf.get(HBASE_MASTER_RSPROC_DISPATCHER_CLASS, DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS);
1872    final RSProcedureDispatcher procedureDispatcher = ReflectionUtils.instantiateWithCustomCtor(
1873      procedureDispatcherClassName, new Class[] { MasterServices.class }, new Object[] { this });
1874    final MasterProcedureEnv procEnv = new MasterProcedureEnv(this, procedureDispatcher);
1875    procedureStore = new RegionProcedureStore(this, masterRegion,
1876      new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
1877    procedureStore.registerListener(new ProcedureStoreListener() {
1878
1879      @Override
1880      public void abortProcess() {
1881        abort("The Procedure Store lost the lease", null);
1882      }
1883    });
1884    MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
1885    procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
1886    configurationManager.registerObserver(procEnv);
1887
1888    int cpus = Runtime.getRuntime().availableProcessors();
1889    int defaultNumThreads = Math.max((cpus > 0 ? cpus / 4 : 0),
1890      MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS);
1891    int numThreads =
1892      conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, defaultNumThreads);
1893    if (numThreads <= 0) {
1894      LOG.warn("{} is set to {}, which is invalid, using default value {} instead",
1895        MasterProcedureConstants.MASTER_PROCEDURE_THREADS, numThreads, defaultNumThreads);
1896      numThreads = defaultNumThreads;
1897    }
1898    final boolean abortOnCorruption =
1899      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1900        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1901    procedureStore.start(numThreads);
1902    // Just initialize it but do not start the workers, we will start the workers later by calling
1903    // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
1904    // details.
1905    procedureExecutor.init(numThreads, abortOnCorruption);
1906    if (!procEnv.getRemoteDispatcher().start()) {
1907      throw new HBaseIOException("Failed start of remote dispatcher");
1908    }
1909  }
1910
1911  // will be override in UT
1912  protected void startProcedureExecutor() throws IOException {
1913    procedureExecutor.startWorkers();
1914  }
1915
1916  /**
1917   * Turn on/off Snapshot Cleanup Chore
1918   * @param on indicates whether Snapshot Cleanup Chore is to be run
1919   */
1920  void switchSnapshotCleanup(final boolean on, final boolean synchronous) throws IOException {
1921    if (synchronous) {
1922      synchronized (this.snapshotCleanerChore) {
1923        switchSnapshotCleanup(on);
1924      }
1925    } else {
1926      switchSnapshotCleanup(on);
1927    }
1928  }
1929
1930  private void switchSnapshotCleanup(final boolean on) throws IOException {
1931    snapshotCleanupStateStore.set(on);
1932    if (on) {
1933      getChoreService().scheduleChore(this.snapshotCleanerChore);
1934    } else {
1935      this.snapshotCleanerChore.cancel();
1936    }
1937  }
1938
1939  private void stopProcedureExecutor() {
1940    if (procedureExecutor != null) {
1941      configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
1942      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
1943      procedureExecutor.stop();
1944      procedureExecutor.join();
1945      procedureExecutor = null;
1946    }
1947
1948    if (procedureStore != null) {
1949      procedureStore.stop(isAborted());
1950      procedureStore = null;
1951    }
1952  }
1953
1954  protected void stopChores() {
1955    shutdownChore(mobFileCleanerChore);
1956    shutdownChore(mobFileCompactionChore);
1957    shutdownChore(balancerChore);
1958    if (regionNormalizerManager != null) {
1959      shutdownChore(regionNormalizerManager.getRegionNormalizerChore());
1960    }
1961    shutdownChore(clusterStatusChore);
1962    shutdownChore(catalogJanitorChore);
1963    shutdownChore(clusterStatusPublisherChore);
1964    shutdownChore(snapshotQuotaChore);
1965    shutdownChore(logCleaner);
1966    if (hfileCleaners != null) {
1967      for (ScheduledChore chore : hfileCleaners) {
1968        chore.shutdown();
1969      }
1970      hfileCleaners = null;
1971    }
1972    shutdownChore(replicationBarrierCleaner);
1973    shutdownChore(snapshotCleanerChore);
1974    shutdownChore(hbckChore);
1975    shutdownChore(regionsRecoveryChore);
1976    shutdownChore(rollingUpgradeChore);
1977    shutdownChore(oldWALsDirSizeChore);
1978  }
1979
1980  /** Returns Get remote side's InetAddress */
1981  InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1982    throws UnknownHostException {
1983    // Do it out here in its own little method so can fake an address when
1984    // mocking up in tests.
1985    InetAddress ia = RpcServer.getRemoteIp();
1986
1987    // The call could be from the local regionserver,
1988    // in which case, there is no remote address.
1989    if (ia == null && serverStartCode == startcode) {
1990      InetSocketAddress isa = rpcServices.getSocketAddress();
1991      if (isa != null && isa.getPort() == port) {
1992        ia = isa.getAddress();
1993      }
1994    }
1995    return ia;
1996  }
1997
1998  /** Returns Maximum time we should run balancer for */
1999  private int getMaxBalancingTime() {
2000    // if max balancing time isn't set, defaulting it to period time
2001    int maxBalancingTime =
2002      getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING, getConfiguration()
2003        .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
2004    return maxBalancingTime;
2005  }
2006
2007  /** Returns Maximum number of regions in transition */
2008  private int getMaxRegionsInTransition() {
2009    int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
2010    return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
2011  }
2012
2013  /**
2014   * It first sleep to the next balance plan start time. Meanwhile, throttling by the max number
2015   * regions in transition to protect availability.
2016   * @param nextBalanceStartTime   The next balance plan start time
2017   * @param maxRegionsInTransition max number of regions in transition
2018   * @param cutoffTime             when to exit balancer
2019   */
2020  private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
2021    long cutoffTime) {
2022    boolean interrupted = false;
2023
2024    // Sleep to next balance plan start time
2025    // But if there are zero regions in transition, it can skip sleep to speed up.
2026    while (
2027      !interrupted && EnvironmentEdgeManager.currentTime() < nextBalanceStartTime
2028        && this.assignmentManager.getRegionTransitScheduledCount() > 0
2029    ) {
2030      try {
2031        Thread.sleep(100);
2032      } catch (InterruptedException ie) {
2033        interrupted = true;
2034      }
2035    }
2036
2037    // Throttling by max number regions in transition
2038    while (
2039      !interrupted && maxRegionsInTransition > 0
2040        && this.assignmentManager.getRegionTransitScheduledCount() >= maxRegionsInTransition
2041        && EnvironmentEdgeManager.currentTime() <= cutoffTime
2042    ) {
2043      try {
2044        // sleep if the number of regions in transition exceeds the limit
2045        Thread.sleep(100);
2046      } catch (InterruptedException ie) {
2047        interrupted = true;
2048      }
2049    }
2050
2051    if (interrupted) Thread.currentThread().interrupt();
2052  }
2053
2054  public BalanceResponse balance() throws IOException {
2055    return balance(BalanceRequest.defaultInstance());
2056  }
2057
2058  /**
2059   * Trigger a normal balance, see {@link HMaster#balance()} . If the balance is not executed this
2060   * time, the metrics related to the balance will be updated. When balance is running, related
2061   * metrics will be updated at the same time. But if some checking logic failed and cause the
2062   * balancer exit early, we lost the chance to update balancer metrics. This will lead to user
2063   * missing the latest balancer info.
2064   */
2065  public BalanceResponse balanceOrUpdateMetrics() throws IOException {
2066    synchronized (this.balancer) {
2067      BalanceResponse response = balance();
2068      if (!response.isBalancerRan()) {
2069        Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
2070          this.assignmentManager.getRegionStates().getAssignmentsForBalancer(this.tableStateManager,
2071            this.serverManager.getOnlineServersList());
2072        for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
2073          serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
2074        }
2075        this.balancer.updateBalancerLoadInfo(assignments);
2076      }
2077      return response;
2078    }
2079  }
2080
2081  /**
2082   * Checks master state before initiating action over region topology.
2083   * @param action the name of the action under consideration, for logging.
2084   * @return {@code true} when the caller should exit early, {@code false} otherwise.
2085   */
2086  @Override
2087  public boolean skipRegionManagementAction(final String action) {
2088    // Note: this method could be `default` on MasterServices if but for logging.
2089    if (!isInitialized()) {
2090      LOG.debug("Master has not been initialized, don't run {}.", action);
2091      return true;
2092    }
2093    if (this.getServerManager().isClusterShutdown()) {
2094      LOG.info("Cluster is shutting down, don't run {}.", action);
2095      return true;
2096    }
2097    if (isInMaintenanceMode()) {
2098      LOG.info("Master is in maintenance mode, don't run {}.", action);
2099      return true;
2100    }
2101    return false;
2102  }
2103
2104  public BalanceResponse balance(BalanceRequest request) throws IOException {
2105    checkInitialized();
2106
2107    BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder();
2108
2109    if (loadBalancerStateStore == null || !(loadBalancerStateStore.get() || request.isDryRun())) {
2110      return responseBuilder.build();
2111    }
2112
2113    if (skipRegionManagementAction("balancer")) {
2114      return responseBuilder.build();
2115    }
2116
2117    synchronized (this.balancer) {
2118      try {
2119        this.balancer.onBalancingStart();
2120        // Only allow one balance run at at time.
2121        if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
2122          List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
2123          // if hbase:meta region is in transition, result of assignment cannot be recorded
2124          // ignore the force flag in that case
2125          boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
2126          List<RegionStateNode> toPrint = regionsInTransition;
2127          int max = 5;
2128          boolean truncated = false;
2129          if (regionsInTransition.size() > max) {
2130            toPrint = regionsInTransition.subList(0, max);
2131            truncated = true;
2132          }
2133
2134          if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
2135            LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition
2136              + ") because " + assignmentManager.getRegionTransitScheduledCount()
2137              + " region(s) are scheduled to transit " + toPrint
2138              + (truncated ? "(truncated list)" : ""));
2139            return responseBuilder.build();
2140          }
2141        }
2142        if (this.serverManager.areDeadServersInProgress()) {
2143          LOG.info("Not running balancer because processing dead regionserver(s): "
2144            + this.serverManager.getDeadServers());
2145          return responseBuilder.build();
2146        }
2147
2148        if (this.cpHost != null) {
2149          try {
2150            if (this.cpHost.preBalance(request)) {
2151              LOG.debug("Coprocessor bypassing balancer request");
2152              return responseBuilder.build();
2153            }
2154          } catch (IOException ioe) {
2155            LOG.error("Error invoking master coprocessor preBalance()", ioe);
2156            return responseBuilder.build();
2157          }
2158        }
2159
2160        Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
2161          this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
2162            this.serverManager.getOnlineServersList());
2163        for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
2164          serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
2165        }
2166
2167        // Give the balancer the current cluster state.
2168        this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
2169
2170        List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
2171
2172        responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ? 0 : plans.size());
2173
2174        if (skipRegionManagementAction("balancer")) {
2175          // make one last check that the cluster isn't shutting down before proceeding.
2176          return responseBuilder.build();
2177        }
2178
2179        // For dry run we don't actually want to execute the moves, but we do want
2180        // to execute the coprocessor below
2181        List<RegionPlan> sucRPs =
2182          request.isDryRun() ? Collections.emptyList() : executeRegionPlansWithThrottling(plans);
2183
2184        if (this.cpHost != null) {
2185          try {
2186            this.cpHost.postBalance(request, sucRPs);
2187          } catch (IOException ioe) {
2188            // balancing already succeeded so don't change the result
2189            LOG.error("Error invoking master coprocessor postBalance()", ioe);
2190          }
2191        }
2192
2193        responseBuilder.setMovesExecuted(sucRPs.size());
2194      } finally {
2195        this.balancer.onBalancingComplete();
2196      }
2197    }
2198
2199    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
2200    // Return true indicating a success.
2201    return responseBuilder.build();
2202  }
2203
2204  /**
2205   * Execute region plans with throttling
2206   * @param plans to execute
2207   * @return succeeded plans
2208   */
2209  public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) {
2210    List<RegionPlan> successRegionPlans = new ArrayList<>();
2211    int maxRegionsInTransition = getMaxRegionsInTransition();
2212    long balanceStartTime = EnvironmentEdgeManager.currentTime();
2213    long cutoffTime = balanceStartTime + this.maxBalancingTime;
2214    int rpCount = 0; // number of RegionPlans balanced so far
2215    if (plans != null && !plans.isEmpty()) {
2216      int balanceInterval = this.maxBalancingTime / plans.size();
2217      LOG.info(
2218        "Balancer plans size is " + plans.size() + ", the balance interval is " + balanceInterval
2219          + " ms, and the max number regions in transition is " + maxRegionsInTransition);
2220
2221      for (RegionPlan plan : plans) {
2222        LOG.info("balance " + plan);
2223        // TODO: bulk assign
2224        try {
2225          this.assignmentManager.balance(plan);
2226          this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
2227          this.balancer.throttle(plan);
2228        } catch (HBaseIOException hioe) {
2229          // should ignore failed plans here, avoiding the whole balance plans be aborted
2230          // later calls of balance() can fetch up the failed and skipped plans
2231          LOG.warn("Failed balance plan {}, skipping...", plan, hioe);
2232        } catch (Exception e) {
2233          LOG.warn("Failed throttling assigning a new plan.", e);
2234        }
2235        // rpCount records balance plans processed, does not care if a plan succeeds
2236        rpCount++;
2237        successRegionPlans.add(plan);
2238
2239        if (this.maxBalancingTime > 0) {
2240          balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
2241            cutoffTime);
2242        }
2243
2244        // if performing next balance exceeds cutoff time, exit the loop
2245        if (
2246          this.maxBalancingTime > 0 && rpCount < plans.size()
2247            && EnvironmentEdgeManager.currentTime() > cutoffTime
2248        ) {
2249          // TODO: After balance, there should not be a cutoff time (keeping it as
2250          // a security net for now)
2251          LOG.debug(
2252            "No more balancing till next balance run; maxBalanceTime=" + this.maxBalancingTime);
2253          break;
2254        }
2255      }
2256    }
2257    LOG.debug("Balancer is going into sleep until next period in {}ms", getConfiguration()
2258      .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
2259    return successRegionPlans;
2260  }
2261
2262  @Override
2263  public RegionNormalizerManager getRegionNormalizerManager() {
2264    return regionNormalizerManager;
2265  }
2266
2267  @Override
2268  public boolean normalizeRegions(final NormalizeTableFilterParams ntfp,
2269    final boolean isHighPriority) throws IOException {
2270    if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
2271      LOG.debug("Region normalization is disabled, don't run region normalizer.");
2272      return false;
2273    }
2274    if (skipRegionManagementAction("region normalizer")) {
2275      return false;
2276    }
2277    if (assignmentManager.getRegionTransitScheduledCount() > 0) {
2278      return false;
2279    }
2280
2281    final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
2282      ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false).stream()
2283      .map(TableDescriptor::getTableName).collect(Collectors.toSet());
2284    final Set<TableName> allEnabledTables =
2285      tableStateManager.getTablesInStates(TableState.State.ENABLED);
2286    final List<TableName> targetTables =
2287      new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
2288    Collections.shuffle(targetTables);
2289    return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
2290  }
2291
2292  /** Returns Client info for use as prefix on an audit log string; who did an action */
2293  @Override
2294  public String getClientIdAuditPrefix() {
2295    return "Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
2296      + RpcServer.getRemoteAddress().orElse(null);
2297  }
2298
2299  /**
2300   * Switch for the background CatalogJanitor thread. Used for testing. The thread will continue to
2301   * run. It will just be a noop if disabled.
2302   * @param b If false, the catalog janitor won't do anything.
2303   */
2304  public void setCatalogJanitorEnabled(final boolean b) {
2305    this.catalogJanitorChore.setEnabled(b);
2306  }
2307
2308  @Override
2309  public long mergeRegions(final RegionInfo[] regionsToMerge, final boolean forcible, final long ng,
2310    final long nonce) throws IOException {
2311    checkInitialized();
2312
2313    final String regionNamesToLog = RegionInfo.getShortNameToLog(regionsToMerge);
2314
2315    if (!isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
2316      LOG.warn("Merge switch is off! skip merge of " + regionNamesToLog);
2317      throw new DoNotRetryIOException(
2318        "Merge of " + regionNamesToLog + " failed because merge switch is off");
2319    }
2320
2321    if (!getTableDescriptors().get(regionsToMerge[0].getTable()).isMergeEnabled()) {
2322      LOG.warn("Merge is disabled for the table! Skipping merge of {}", regionNamesToLog);
2323      throw new DoNotRetryIOException(
2324        "Merge of " + regionNamesToLog + " failed as region merge is disabled for the table");
2325    }
2326
2327    return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
2328      @Override
2329      protected void run() throws IOException {
2330        getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge);
2331        String aid = getClientIdAuditPrefix();
2332        LOG.info("{} merge regions {}", aid, regionNamesToLog);
2333        submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(),
2334          regionsToMerge, forcible));
2335        getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge);
2336      }
2337
2338      @Override
2339      protected String getDescription() {
2340        return "MergeTableProcedure";
2341      }
2342    });
2343  }
2344
2345  @Override
2346  public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow, final long nonceGroup,
2347    final long nonce) throws IOException {
2348    checkInitialized();
2349
2350    if (!isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
2351      LOG.warn("Split switch is off! skip split of " + regionInfo);
2352      throw new DoNotRetryIOException(
2353        "Split region " + regionInfo.getRegionNameAsString() + " failed due to split switch off");
2354    }
2355
2356    if (!getTableDescriptors().get(regionInfo.getTable()).isSplitEnabled()) {
2357      LOG.warn("Split is disabled for the table! Skipping split of {}", regionInfo);
2358      throw new DoNotRetryIOException("Split region " + regionInfo.getRegionNameAsString()
2359        + " failed as region split is disabled for the table");
2360    }
2361
2362    return MasterProcedureUtil
2363      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2364        @Override
2365        protected void run() throws IOException {
2366          getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
2367          LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
2368
2369          // Execute the operation asynchronously
2370          submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
2371        }
2372
2373        @Override
2374        protected String getDescription() {
2375          return "SplitTableProcedure";
2376        }
2377      });
2378  }
2379
2380  private void warmUpRegion(ServerName server, RegionInfo region) {
2381    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
2382      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
2383        if (e != null) {
2384          LOG.warn("Failed to warm up region {} on server {}", region, server, e);
2385        }
2386      });
2387  }
2388
2389  // Public so can be accessed by tests. Blocks until move is done.
2390  // Replace with an async implementation from which you can get
2391  // a success/failure result.
2392  @InterfaceAudience.Private
2393  public void move(final byte[] encodedRegionName, byte[] destServerName) throws IOException {
2394    RegionState regionState =
2395      assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
2396
2397    RegionInfo hri;
2398    if (regionState != null) {
2399      hri = regionState.getRegion();
2400    } else {
2401      throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
2402    }
2403
2404    ServerName dest;
2405    List<ServerName> exclude = hri.getTable().isSystemTable()
2406      ? assignmentManager.getExcludedServersForSystemTable()
2407      : new ArrayList<>(1);
2408    if (
2409      destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))
2410    ) {
2411      LOG.info(Bytes.toString(encodedRegionName) + " can not move to "
2412        + Bytes.toString(destServerName) + " because the server is in exclude list");
2413      destServerName = null;
2414    }
2415    if (destServerName == null || destServerName.length == 0) {
2416      LOG.info("Passed destination servername is null/empty so " + "choosing a server at random");
2417      exclude.add(regionState.getServerName());
2418      final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
2419      dest = balancer.randomAssignment(hri, destServers);
2420      if (dest == null) {
2421        LOG.debug("Unable to determine a plan to assign " + hri);
2422        return;
2423      }
2424    } else {
2425      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
2426      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
2427      if (dest == null) {
2428        LOG.debug("Unable to determine a plan to assign " + hri);
2429        return;
2430      }
2431      // TODO: deal with table on master for rs group.
2432      if (dest.equals(serverName)) {
2433        // To avoid unnecessary region moving later by balancer. Don't put user
2434        // regions on master.
2435        LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
2436          + " to avoid unnecessary region moving later by load balancer,"
2437          + " because it should not be on master");
2438        return;
2439      }
2440    }
2441
2442    if (dest.equals(regionState.getServerName())) {
2443      LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
2444        + " because region already assigned to the same server " + dest + ".");
2445      return;
2446    }
2447
2448    // Now we can do the move
2449    RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
2450    assert rp.getDestination() != null : rp.toString() + " " + dest;
2451
2452    try {
2453      checkInitialized();
2454      if (this.cpHost != null) {
2455        this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
2456      }
2457
2458      TransitRegionStateProcedure proc =
2459        this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
2460      if (conf.getBoolean(WARMUP_BEFORE_MOVE, DEFAULT_WARMUP_BEFORE_MOVE)) {
2461        // Warmup the region on the destination before initiating the move.
2462        // A region server could reject the close request because it either does not
2463        // have the specified region or the region is being split.
2464        LOG.info(getClientIdAuditPrefix() + " move " + rp + ", warming up region on "
2465          + rp.getDestination());
2466        warmUpRegion(rp.getDestination(), hri);
2467      }
2468      LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
2469      Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
2470      try {
2471        // Is this going to work? Will we throw exception on error?
2472        // TODO: CompletableFuture rather than this stunted Future.
2473        future.get();
2474      } catch (InterruptedException | ExecutionException e) {
2475        throw new HBaseIOException(e);
2476      }
2477      if (this.cpHost != null) {
2478        this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
2479      }
2480    } catch (IOException ioe) {
2481      if (ioe instanceof HBaseIOException) {
2482        throw (HBaseIOException) ioe;
2483      }
2484      throw new HBaseIOException(ioe);
2485    }
2486  }
2487
2488  @Override
2489  public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys,
2490    final long nonceGroup, final long nonce) throws IOException {
2491    checkInitialized();
2492    TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
2493    if (desc == null) {
2494      throw new IOException("Creation for " + tableDescriptor + " is canceled by CP");
2495    }
2496    String namespace = desc.getTableName().getNamespaceAsString();
2497    this.clusterSchemaService.getNamespace(namespace);
2498
2499    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
2500    TableDescriptorChecker.sanityCheck(conf, desc);
2501
2502    return MasterProcedureUtil
2503      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2504        @Override
2505        protected void run() throws IOException {
2506          getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions);
2507
2508          LOG.info(getClientIdAuditPrefix() + " create " + desc);
2509
2510          // TODO: We can handle/merge duplicate requests, and differentiate the case of
2511          // TableExistsException by saying if the schema is the same or not.
2512          //
2513          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2514          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2515          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2516          submitProcedure(
2517            new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch));
2518          latch.await();
2519
2520          getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions);
2521        }
2522
2523        @Override
2524        protected String getDescription() {
2525          return "CreateTableProcedure";
2526        }
2527      });
2528  }
2529
2530  @Override
2531  public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException {
2532    return createSystemTable(tableDescriptor, false);
2533  }
2534
2535  private long createSystemTable(final TableDescriptor tableDescriptor, final boolean isCritical)
2536    throws IOException {
2537    if (isStopped()) {
2538      throw new MasterNotRunningException();
2539    }
2540
2541    TableName tableName = tableDescriptor.getTableName();
2542    if (!(tableName.isSystemTable())) {
2543      throw new IllegalArgumentException(
2544        "Only system table creation can use this createSystemTable API");
2545    }
2546
2547    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null);
2548
2549    LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor);
2550
2551    // This special create table is called locally to master. Therefore, no RPC means no need
2552    // to use nonce to detect duplicated RPC call.
2553    CreateTableProcedure proc =
2554      new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions);
2555    proc.setCriticalSystemTable(isCritical);
2556    return this.procedureExecutor.submitProcedure(proc);
2557  }
2558
2559  private void startActiveMasterManager(int infoPort) throws KeeperException {
2560    String backupZNode = ZNodePaths.joinZNode(zooKeeper.getZNodePaths().backupMasterAddressesZNode,
2561      serverName.toString());
2562    /*
2563     * Add a ZNode for ourselves in the backup master directory since we may not become the active
2564     * master. If so, we want the actual active master to know we are backup masters, so that it
2565     * won't assign regions to us if so configured. If we become the active master later,
2566     * ActiveMasterManager will delete this node explicitly. If we crash before then, ZooKeeper will
2567     * delete this node for us since it is ephemeral.
2568     */
2569    LOG.info("Adding backup master ZNode " + backupZNode);
2570    if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
2571      LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2572    }
2573    this.activeMasterManager.setInfoPort(infoPort);
2574    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2575    // If we're a backup master, stall until a primary to write this address
2576    if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2577      LOG.debug("HMaster started in backup mode. Stalling until master znode is written.");
2578      // This will only be a minute or so while the cluster starts up,
2579      // so don't worry about setting watches on the parent znode
2580      while (!activeMasterManager.hasActiveMaster()) {
2581        LOG.debug("Waiting for master address and cluster state znode to be written.");
2582        Threads.sleep(timeout);
2583      }
2584    }
2585
2586    // Here for the master startup process, we use TaskGroup to monitor the whole progress.
2587    // The UI is similar to how Hadoop designed the startup page for the NameNode.
2588    // See HBASE-21521 for more details.
2589    // We do not cleanup the startupTaskGroup, let the startup progress information
2590    // be permanent in the MEM.
2591    startupTaskGroup = TaskMonitor.createTaskGroup(true, "Master startup");
2592    try {
2593      if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, startupTaskGroup)) {
2594        finishActiveMasterInitialization();
2595      }
2596    } catch (Throwable t) {
2597      startupTaskGroup.abort("Failed to become active master due to:" + t.getMessage());
2598      LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t);
2599      // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2600      if (
2601        t instanceof NoClassDefFoundError
2602          && t.getMessage().contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")
2603      ) {
2604        // improved error message for this special case
2605        abort("HBase is having a problem with its Hadoop jars.  You may need to recompile "
2606          + "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion()
2607          + " or change your hadoop jars to start properly", t);
2608      } else {
2609        abort("Unhandled exception. Starting shutdown.", t);
2610      }
2611    }
2612  }
2613
2614  private static boolean isCatalogTable(final TableName tableName) {
2615    return tableName.equals(TableName.META_TABLE_NAME);
2616  }
2617
2618  @Override
2619  public long deleteTable(final TableName tableName, final long nonceGroup, final long nonce)
2620    throws IOException {
2621    checkInitialized();
2622
2623    return MasterProcedureUtil
2624      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2625        @Override
2626        protected void run() throws IOException {
2627          getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2628
2629          LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2630
2631          // TODO: We can handle/merge duplicate request
2632          //
2633          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2634          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2635          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2636          submitProcedure(
2637            new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch));
2638          latch.await();
2639
2640          getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2641        }
2642
2643        @Override
2644        protected String getDescription() {
2645          return "DeleteTableProcedure";
2646        }
2647      });
2648  }
2649
2650  @Override
2651  public long truncateTable(final TableName tableName, final boolean preserveSplits,
2652    final long nonceGroup, final long nonce) throws IOException {
2653    checkInitialized();
2654
2655    return MasterProcedureUtil
2656      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2657        @Override
2658        protected void run() throws IOException {
2659          getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2660
2661          LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2662          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2663          submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName,
2664            preserveSplits, latch));
2665          latch.await();
2666
2667          getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2668        }
2669
2670        @Override
2671        protected String getDescription() {
2672          return "TruncateTableProcedure";
2673        }
2674      });
2675  }
2676
2677  @Override
2678  public long truncateRegion(final RegionInfo regionInfo, final long nonceGroup, final long nonce)
2679    throws IOException {
2680    checkInitialized();
2681
2682    return MasterProcedureUtil
2683      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2684        @Override
2685        protected void run() throws IOException {
2686          getMaster().getMasterCoprocessorHost().preTruncateRegion(regionInfo);
2687
2688          LOG.info(
2689            getClientIdAuditPrefix() + " truncate region " + regionInfo.getRegionNameAsString());
2690
2691          // Execute the operation asynchronously
2692          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2693          submitProcedure(
2694            new TruncateRegionProcedure(procedureExecutor.getEnvironment(), regionInfo, latch));
2695          latch.await();
2696
2697          getMaster().getMasterCoprocessorHost().postTruncateRegion(regionInfo);
2698        }
2699
2700        @Override
2701        protected String getDescription() {
2702          return "TruncateRegionProcedure";
2703        }
2704      });
2705  }
2706
2707  @Override
2708  public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
2709    final long nonceGroup, final long nonce) throws IOException {
2710    checkInitialized();
2711    checkTableExists(tableName);
2712
2713    return modifyTable(tableName, new TableDescriptorGetter() {
2714
2715      @Override
2716      public TableDescriptor get() throws IOException {
2717        TableDescriptor old = getTableDescriptors().get(tableName);
2718        if (old.hasColumnFamily(column.getName())) {
2719          throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString()
2720            + "' in table '" + tableName + "' already exists so cannot be added");
2721        }
2722
2723        return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
2724      }
2725    }, nonceGroup, nonce, true);
2726  }
2727
2728  /**
2729   * Implement to return TableDescriptor after pre-checks
2730   */
2731  protected interface TableDescriptorGetter {
2732    TableDescriptor get() throws IOException;
2733  }
2734
2735  @Override
2736  public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor,
2737    final long nonceGroup, final long nonce) throws IOException {
2738    checkInitialized();
2739    checkTableExists(tableName);
2740    return modifyTable(tableName, new TableDescriptorGetter() {
2741
2742      @Override
2743      public TableDescriptor get() throws IOException {
2744        TableDescriptor old = getTableDescriptors().get(tableName);
2745        if (!old.hasColumnFamily(descriptor.getName())) {
2746          throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString()
2747            + "' does not exist, so it cannot be modified");
2748        }
2749
2750        return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
2751      }
2752    }, nonceGroup, nonce, true);
2753  }
2754
2755  @Override
2756  public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, String dstSFT,
2757    long nonceGroup, long nonce) throws IOException {
2758    checkInitialized();
2759    return MasterProcedureUtil
2760      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2761
2762        @Override
2763        protected void run() throws IOException {
2764          String sft = getMaster().getMasterCoprocessorHost()
2765            .preModifyColumnFamilyStoreFileTracker(tableName, family, dstSFT);
2766          LOG.info("{} modify column {} store file tracker of table {} to {}",
2767            getClientIdAuditPrefix(), Bytes.toStringBinary(family), tableName, sft);
2768          submitProcedure(new ModifyColumnFamilyStoreFileTrackerProcedure(
2769            procedureExecutor.getEnvironment(), tableName, family, sft));
2770          getMaster().getMasterCoprocessorHost().postModifyColumnFamilyStoreFileTracker(tableName,
2771            family, dstSFT);
2772        }
2773
2774        @Override
2775        protected String getDescription() {
2776          return "ModifyColumnFamilyStoreFileTrackerProcedure";
2777        }
2778      });
2779  }
2780
2781  @Override
2782  public long deleteColumn(final TableName tableName, final byte[] columnName,
2783    final long nonceGroup, final long nonce) throws IOException {
2784    checkInitialized();
2785    checkTableExists(tableName);
2786
2787    return modifyTable(tableName, new TableDescriptorGetter() {
2788
2789      @Override
2790      public TableDescriptor get() throws IOException {
2791        TableDescriptor old = getTableDescriptors().get(tableName);
2792
2793        if (!old.hasColumnFamily(columnName)) {
2794          throw new InvalidFamilyOperationException(
2795            "Family '" + Bytes.toString(columnName) + "' does not exist, so it cannot be deleted");
2796        }
2797        if (old.getColumnFamilyCount() == 1) {
2798          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2799            + "' is the only column family in the table, so it cannot be deleted");
2800        }
2801        return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
2802      }
2803    }, nonceGroup, nonce, true);
2804  }
2805
2806  @Override
2807  public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2808    throws IOException {
2809    checkInitialized();
2810
2811    return MasterProcedureUtil
2812      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2813        @Override
2814        protected void run() throws IOException {
2815          getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2816
2817          // Normally, it would make sense for this authorization check to exist inside
2818          // AccessController, but because the authorization check is done based on internal state
2819          // (rather than explicit permissions) we'll do the check here instead of in the
2820          // coprocessor.
2821          MasterQuotaManager quotaManager = getMasterQuotaManager();
2822          if (quotaManager != null) {
2823            if (quotaManager.isQuotaInitialized()) {
2824              // skip checking quotas for system tables, see:
2825              // https://issues.apache.org/jira/browse/HBASE-28183
2826              if (!tableName.isSystemTable()) {
2827                SpaceQuotaSnapshot currSnapshotOfTable =
2828                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
2829                if (currSnapshotOfTable != null) {
2830                  SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
2831                  if (
2832                    quotaStatus.isInViolation()
2833                      && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
2834                  ) {
2835                    throw new AccessDeniedException("Enabling the table '" + tableName
2836                      + "' is disallowed due to a violated space quota.");
2837                  }
2838                }
2839              }
2840            } else if (LOG.isTraceEnabled()) {
2841              LOG
2842                .trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
2843            }
2844          }
2845
2846          LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2847
2848          // Execute the operation asynchronously - client will check the progress of the operation
2849          // In case the request is from a <1.1 client before returning,
2850          // we want to make sure that the table is prepared to be
2851          // enabled (the table is locked and the table state is set).
2852          // Note: if the procedure throws exception, we will catch it and rethrow.
2853          final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2854          submitProcedure(
2855            new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, prepareLatch));
2856          prepareLatch.await();
2857
2858          getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2859        }
2860
2861        @Override
2862        protected String getDescription() {
2863          return "EnableTableProcedure";
2864        }
2865      });
2866  }
2867
2868  @Override
2869  public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2870    throws IOException {
2871    checkInitialized();
2872
2873    return MasterProcedureUtil
2874      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2875        @Override
2876        protected void run() throws IOException {
2877          getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2878
2879          LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2880
2881          // Execute the operation asynchronously - client will check the progress of the operation
2882          // In case the request is from a <1.1 client before returning,
2883          // we want to make sure that the table is prepared to be
2884          // enabled (the table is locked and the table state is set).
2885          // Note: if the procedure throws exception, we will catch it and rethrow.
2886          //
2887          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2888          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2889          final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch();
2890          submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName,
2891            false, prepareLatch));
2892          prepareLatch.await();
2893
2894          getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2895        }
2896
2897        @Override
2898        protected String getDescription() {
2899          return "DisableTableProcedure";
2900        }
2901      });
2902  }
2903
2904  private long modifyTable(final TableName tableName,
2905    final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2906    final boolean shouldCheckDescriptor) throws IOException {
2907    return modifyTable(tableName, newDescriptorGetter, nonceGroup, nonce, shouldCheckDescriptor,
2908      true);
2909  }
2910
2911  private long modifyTable(final TableName tableName,
2912    final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2913    final boolean shouldCheckDescriptor, final boolean reopenRegions) throws IOException {
2914    return MasterProcedureUtil
2915      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2916        @Override
2917        protected void run() throws IOException {
2918          TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
2919          TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
2920            .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
2921          TableDescriptorChecker.sanityCheck(conf, newDescriptor);
2922          LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
2923            oldDescriptor, newDescriptor);
2924
2925          // Execute the operation synchronously - wait for the operation completes before
2926          // continuing.
2927          //
2928          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2929          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2930          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2931          submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
2932            newDescriptor, latch, oldDescriptor, shouldCheckDescriptor, reopenRegions));
2933          latch.await();
2934
2935          getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
2936            newDescriptor);
2937        }
2938
2939        @Override
2940        protected String getDescription() {
2941          return "ModifyTableProcedure";
2942        }
2943      });
2944
2945  }
2946
2947  @Override
2948  public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor,
2949    final long nonceGroup, final long nonce, final boolean reopenRegions) throws IOException {
2950    checkInitialized();
2951    return modifyTable(tableName, new TableDescriptorGetter() {
2952      @Override
2953      public TableDescriptor get() throws IOException {
2954        return newDescriptor;
2955      }
2956    }, nonceGroup, nonce, false, reopenRegions);
2957
2958  }
2959
2960  @Override
2961  public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup,
2962    long nonce) throws IOException {
2963    checkInitialized();
2964    return MasterProcedureUtil
2965      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2966
2967        @Override
2968        protected void run() throws IOException {
2969          String sft = getMaster().getMasterCoprocessorHost()
2970            .preModifyTableStoreFileTracker(tableName, dstSFT);
2971          LOG.info("{} modify table store file tracker of table {} to {}", getClientIdAuditPrefix(),
2972            tableName, sft);
2973          submitProcedure(new ModifyTableStoreFileTrackerProcedure(
2974            procedureExecutor.getEnvironment(), tableName, sft));
2975          getMaster().getMasterCoprocessorHost().postModifyTableStoreFileTracker(tableName, sft);
2976        }
2977
2978        @Override
2979        protected String getDescription() {
2980          return "ModifyTableStoreFileTrackerProcedure";
2981        }
2982      });
2983  }
2984
2985  public long restoreSnapshot(final SnapshotDescription snapshotDesc, final long nonceGroup,
2986    final long nonce, final boolean restoreAcl, final String customSFT) throws IOException {
2987    checkInitialized();
2988    getSnapshotManager().checkSnapshotSupport();
2989
2990    // Ensure namespace exists. Will throw exception if non-known NS.
2991    final TableName dstTable = TableName.valueOf(snapshotDesc.getTable());
2992    getClusterSchema().getNamespace(dstTable.getNamespaceAsString());
2993
2994    return MasterProcedureUtil
2995      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2996        @Override
2997        protected void run() throws IOException {
2998          setProcId(getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(),
2999            restoreAcl, customSFT));
3000        }
3001
3002        @Override
3003        protected String getDescription() {
3004          return "RestoreSnapshotProcedure";
3005        }
3006      });
3007  }
3008
3009  private void checkTableExists(final TableName tableName)
3010    throws IOException, TableNotFoundException {
3011    if (!tableDescriptors.exists(tableName)) {
3012      throw new TableNotFoundException(tableName);
3013    }
3014  }
3015
3016  @Override
3017  public void checkTableModifiable(final TableName tableName)
3018    throws IOException, TableNotFoundException, TableNotDisabledException {
3019    if (isCatalogTable(tableName)) {
3020      throw new IOException("Can't modify catalog tables");
3021    }
3022    checkTableExists(tableName);
3023    TableState ts = getTableStateManager().getTableState(tableName);
3024    if (!ts.isDisabled()) {
3025      throw new TableNotDisabledException("Not DISABLED; " + ts);
3026    }
3027  }
3028
3029  public void reloadRegionServerQuotas() {
3030    // multiple reloads are harmless, so no need for NonceProcedureRunnable
3031    getLiveRegionServers()
3032      .forEach(sn -> procedureExecutor.submitProcedure(new ReloadQuotasProcedure(sn)));
3033  }
3034
3035  public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException {
3036    return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class));
3037  }
3038
3039  public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options)
3040    throws InterruptedIOException {
3041    ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder();
3042    // given that hbase1 can't submit the request with Option,
3043    // we return all information to client if the list of Option is empty.
3044    if (options.isEmpty()) {
3045      options = EnumSet.allOf(Option.class);
3046    }
3047
3048    // TASKS and/or LIVE_SERVERS will populate this map, which will be given to the builder if
3049    // not null after option processing completes.
3050    Map<ServerName, ServerMetrics> serverMetricsMap = null;
3051
3052    for (Option opt : options) {
3053      switch (opt) {
3054        case HBASE_VERSION:
3055          builder.setHBaseVersion(VersionInfo.getVersion());
3056          break;
3057        case CLUSTER_ID:
3058          builder.setClusterId(getClusterId());
3059          break;
3060        case MASTER:
3061          builder.setMasterName(getServerName());
3062          break;
3063        case BACKUP_MASTERS:
3064          builder.setBackerMasterNames(getBackupMasters());
3065          break;
3066        case TASKS: {
3067          // Master tasks
3068          builder.setMasterTasks(TaskMonitor.get().getTasks().stream()
3069            .map(task -> ServerTaskBuilder.newBuilder().setDescription(task.getDescription())
3070              .setStatus(task.getStatus())
3071              .setState(ServerTask.State.valueOf(task.getState().name()))
3072              .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp())
3073              .build())
3074            .collect(Collectors.toList()));
3075          // TASKS is also synonymous with LIVE_SERVERS for now because task information for
3076          // regionservers is carried in ServerLoad.
3077          // Add entries to serverMetricsMap for all live servers, if we haven't already done so
3078          if (serverMetricsMap == null) {
3079            serverMetricsMap = getOnlineServers();
3080          }
3081          break;
3082        }
3083        case LIVE_SERVERS: {
3084          // Add entries to serverMetricsMap for all live servers, if we haven't already done so
3085          if (serverMetricsMap == null) {
3086            serverMetricsMap = getOnlineServers();
3087          }
3088          break;
3089        }
3090        case DEAD_SERVERS: {
3091          if (serverManager != null) {
3092            builder.setDeadServerNames(
3093              new ArrayList<>(serverManager.getDeadServers().copyServerNames()));
3094          }
3095          break;
3096        }
3097        case UNKNOWN_SERVERS: {
3098          if (serverManager != null) {
3099            builder.setUnknownServerNames(getUnknownServers());
3100          }
3101          break;
3102        }
3103        case MASTER_COPROCESSORS: {
3104          if (cpHost != null) {
3105            builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors()));
3106          }
3107          break;
3108        }
3109        case REGIONS_IN_TRANSITION: {
3110          if (assignmentManager != null) {
3111            builder.setRegionsInTransition(
3112              new ArrayList<>(assignmentManager.getRegionsStateInTransition()));
3113          }
3114          break;
3115        }
3116        case BALANCER_ON: {
3117          if (loadBalancerStateStore != null) {
3118            builder.setBalancerOn(loadBalancerStateStore.get());
3119          }
3120          break;
3121        }
3122        case MASTER_INFO_PORT: {
3123          if (infoServer != null) {
3124            builder.setMasterInfoPort(infoServer.getPort());
3125          }
3126          break;
3127        }
3128        case SERVERS_NAME: {
3129          if (serverManager != null) {
3130            builder.setServerNames(serverManager.getOnlineServersList());
3131          }
3132          break;
3133        }
3134        case TABLE_TO_REGIONS_COUNT: {
3135          if (isActiveMaster() && isInitialized() && assignmentManager != null) {
3136            try {
3137              Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>();
3138              Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll();
3139              for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) {
3140                TableName tableName = tableDescriptor.getTableName();
3141                RegionStatesCount regionStatesCount =
3142                  assignmentManager.getRegionStatesCount(tableName);
3143                tableRegionStatesCountMap.put(tableName, regionStatesCount);
3144              }
3145              builder.setTableRegionStatesCount(tableRegionStatesCountMap);
3146            } catch (IOException e) {
3147              LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e);
3148            }
3149          }
3150          break;
3151        }
3152        case DECOMMISSIONED_SERVERS: {
3153          if (serverManager != null) {
3154            builder.setDecommissionedServerNames(serverManager.getDrainingServersList());
3155          }
3156          break;
3157        }
3158      }
3159    }
3160
3161    if (serverMetricsMap != null) {
3162      builder.setLiveServerMetrics(serverMetricsMap);
3163    }
3164
3165    return builder.build();
3166  }
3167
3168  private List<ServerName> getUnknownServers() {
3169    if (serverManager != null) {
3170      final Set<ServerName> serverNames = getAssignmentManager().getRegionStates().getRegionStates()
3171        .stream().map(RegionState::getServerName).collect(Collectors.toSet());
3172      final List<ServerName> unknownServerNames = serverNames.stream()
3173        .filter(sn -> sn != null && serverManager.isServerUnknown(sn)).collect(Collectors.toList());
3174      return unknownServerNames;
3175    }
3176    return null;
3177  }
3178
3179  private Map<ServerName, ServerMetrics> getOnlineServers() {
3180    if (serverManager != null) {
3181      final Map<ServerName, ServerMetrics> map = new HashMap<>();
3182      serverManager.getOnlineServers().entrySet().forEach(e -> map.put(e.getKey(), e.getValue()));
3183      return map;
3184    }
3185    return null;
3186  }
3187
3188  /** Returns cluster status */
3189  public ClusterMetrics getClusterMetrics() throws IOException {
3190    return getClusterMetrics(EnumSet.allOf(Option.class));
3191  }
3192
3193  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
3194    if (cpHost != null) {
3195      cpHost.preGetClusterMetrics();
3196    }
3197    ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options);
3198    if (cpHost != null) {
3199      cpHost.postGetClusterMetrics(status);
3200    }
3201    return status;
3202  }
3203
3204  /** Returns info port of active master or 0 if any exception occurs. */
3205  public int getActiveMasterInfoPort() {
3206    return activeMasterManager.getActiveMasterInfoPort();
3207  }
3208
3209  /**
3210   * @param sn is ServerName of the backup master
3211   * @return info port of backup master or 0 if any exception occurs.
3212   */
3213  public int getBackupMasterInfoPort(final ServerName sn) {
3214    return activeMasterManager.getBackupMasterInfoPort(sn);
3215  }
3216
3217  /**
3218   * The set of loaded coprocessors is stored in a static set. Since it's statically allocated, it
3219   * does not require that HMaster's cpHost be initialized prior to accessing it.
3220   * @return a String representation of the set of names of the loaded coprocessors.
3221   */
3222  public static String getLoadedCoprocessors() {
3223    return CoprocessorHost.getLoadedCoprocessors().toString();
3224  }
3225
3226  /** Returns timestamp in millis when HMaster was started. */
3227  public long getMasterStartTime() {
3228    return startcode;
3229  }
3230
3231  /** Returns timestamp in millis when HMaster became the active master. */
3232  @Override
3233  public long getMasterActiveTime() {
3234    return masterActiveTime;
3235  }
3236
3237  /** Returns timestamp in millis when HMaster finished becoming the active master */
3238  public long getMasterFinishedInitializationTime() {
3239    return masterFinishedInitializationTime;
3240  }
3241
3242  public int getNumWALFiles() {
3243    return 0;
3244  }
3245
3246  public ProcedureStore getProcedureStore() {
3247    return procedureStore;
3248  }
3249
3250  public int getRegionServerInfoPort(final ServerName sn) {
3251    int port = this.serverManager.getInfoPort(sn);
3252    return port == 0
3253      ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT)
3254      : port;
3255  }
3256
3257  @Override
3258  public String getRegionServerVersion(ServerName sn) {
3259    // Will return "0.0.0" if the server is not online to prevent move system region to unknown
3260    // version RS.
3261    return this.serverManager.getVersion(sn);
3262  }
3263
3264  @Override
3265  public void checkIfShouldMoveSystemRegionAsync() {
3266    assignmentManager.checkIfShouldMoveSystemRegionAsync();
3267  }
3268
3269  /** Returns array of coprocessor SimpleNames. */
3270  public String[] getMasterCoprocessors() {
3271    Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
3272    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
3273  }
3274
3275  @Override
3276  public void abort(String reason, Throwable cause) {
3277    if (!setAbortRequested() || isStopped()) {
3278      LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped());
3279      return;
3280    }
3281    if (cpHost != null) {
3282      // HBASE-4014: dump a list of loaded coprocessors.
3283      LOG.error(HBaseMarkers.FATAL,
3284        "Master server abort: loaded coprocessors are: " + getLoadedCoprocessors());
3285    }
3286    String msg = "***** ABORTING master " + this + ": " + reason + " *****";
3287    if (cause != null) {
3288      LOG.error(HBaseMarkers.FATAL, msg, cause);
3289    } else {
3290      LOG.error(HBaseMarkers.FATAL, msg);
3291    }
3292
3293    try {
3294      stopMaster();
3295    } catch (IOException e) {
3296      LOG.error("Exception occurred while stopping master", e);
3297    }
3298  }
3299
3300  @Override
3301  public MasterCoprocessorHost getMasterCoprocessorHost() {
3302    return cpHost;
3303  }
3304
3305  @Override
3306  public MasterQuotaManager getMasterQuotaManager() {
3307    return quotaManager;
3308  }
3309
3310  @Override
3311  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
3312    return procedureExecutor;
3313  }
3314
3315  @Override
3316  public ServerName getServerName() {
3317    return this.serverName;
3318  }
3319
3320  @Override
3321  public AssignmentManager getAssignmentManager() {
3322    return this.assignmentManager;
3323  }
3324
3325  @Override
3326  public CatalogJanitor getCatalogJanitor() {
3327    return this.catalogJanitorChore;
3328  }
3329
3330  public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
3331    return rsFatals;
3332  }
3333
3334  public TaskGroup getStartupProgress() {
3335    return startupTaskGroup;
3336  }
3337
3338  /**
3339   * Shutdown the cluster. Master runs a coordinated stop of all RegionServers and then itself.
3340   */
3341  public void shutdown() throws IOException {
3342    TraceUtil.trace(() -> {
3343      if (cpHost != null) {
3344        cpHost.preShutdown();
3345      }
3346
3347      // Tell the servermanager cluster shutdown has been called. This makes it so when Master is
3348      // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
3349      // the cluster status as down. RegionServers will notice this change in state and will start
3350      // shutting themselves down. When last has exited, Master can go down.
3351      if (this.serverManager != null) {
3352        this.serverManager.shutdownCluster();
3353      }
3354      if (this.clusterStatusTracker != null) {
3355        try {
3356          this.clusterStatusTracker.setClusterDown();
3357        } catch (KeeperException e) {
3358          LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
3359        }
3360      }
3361      // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
3362      // processing so we can go down.
3363      if (this.procedureExecutor != null) {
3364        this.procedureExecutor.stop();
3365      }
3366      // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
3367      // this is what we want especially if the Master is in startup phase doing call outs to
3368      // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
3369      // the rpc to timeout.
3370      if (this.asyncClusterConnection != null) {
3371        this.asyncClusterConnection.close();
3372      }
3373    }, "HMaster.shutdown");
3374  }
3375
3376  public void stopMaster() throws IOException {
3377    if (cpHost != null) {
3378      cpHost.preStopMaster();
3379    }
3380    stop("Stopped by " + Thread.currentThread().getName());
3381  }
3382
3383  @Override
3384  public void stop(String msg) {
3385    if (!this.stopped) {
3386      LOG.info("***** STOPPING master '" + this + "' *****");
3387      this.stopped = true;
3388      LOG.info("STOPPED: " + msg);
3389      // Wakes run() if it is sleeping
3390      sleeper.skipSleepCycle();
3391      if (this.activeMasterManager != null) {
3392        this.activeMasterManager.stop();
3393      }
3394    }
3395  }
3396
3397  protected void checkServiceStarted() throws ServerNotRunningYetException {
3398    if (!serviceStarted) {
3399      throw new ServerNotRunningYetException("Server is not running yet");
3400    }
3401  }
3402
3403  void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException,
3404    MasterNotRunningException, MasterStoppedException {
3405    checkServiceStarted();
3406    if (!isInitialized()) {
3407      throw new PleaseHoldException("Master is initializing");
3408    }
3409    if (isStopped()) {
3410      throw new MasterStoppedException();
3411    }
3412  }
3413
3414  /**
3415   * Report whether this master is currently the active master or not. If not active master, we are
3416   * parked on ZK waiting to become active. This method is used for testing.
3417   * @return true if active master, false if not.
3418   */
3419  @Override
3420  public boolean isActiveMaster() {
3421    return activeMaster;
3422  }
3423
3424  /**
3425   * Report whether this master has completed with its initialization and is ready. If ready, the
3426   * master is also the active master. A standby master is never ready. This method is used for
3427   * testing.
3428   * @return true if master is ready to go, false if not.
3429   */
3430  @Override
3431  public boolean isInitialized() {
3432    return initialized.isReady();
3433  }
3434
3435  /**
3436   * Report whether this master is started This method is used for testing.
3437   * @return true if master is ready to go, false if not.
3438   */
3439  public boolean isOnline() {
3440    return serviceStarted;
3441  }
3442
3443  /**
3444   * Report whether this master is in maintenance mode.
3445   * @return true if master is in maintenanceMode
3446   */
3447  @Override
3448  public boolean isInMaintenanceMode() {
3449    return maintenanceMode;
3450  }
3451
3452  public void setInitialized(boolean isInitialized) {
3453    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
3454  }
3455
3456  /**
3457   * Mainly used in procedure related tests, where we will restart ProcedureExecutor and
3458   * AssignmentManager, but we do not want to restart master(to speed up the test), so we need to
3459   * disable rpc for a while otherwise some critical rpc requests such as
3460   * reportRegionStateTransition could fail and cause region server to abort.
3461   */
3462  @RestrictedApi(explanation = "Should only be called in tests", link = "",
3463      allowedOnPath = ".*/src/test/.*")
3464  public void setServiceStarted(boolean started) {
3465    this.serviceStarted = started;
3466  }
3467
3468  @Override
3469  public ProcedureEvent<?> getInitializedEvent() {
3470    return initialized;
3471  }
3472
3473  /**
3474   * Compute the average load across all region servers. Currently, this uses a very naive
3475   * computation - just uses the number of regions being served, ignoring stats about number of
3476   * requests.
3477   * @return the average load
3478   */
3479  public double getAverageLoad() {
3480    if (this.assignmentManager == null) {
3481      return 0;
3482    }
3483
3484    RegionStates regionStates = this.assignmentManager.getRegionStates();
3485    if (regionStates == null) {
3486      return 0;
3487    }
3488    return regionStates.getAverageLoad();
3489  }
3490
3491  @Override
3492  public boolean registerService(Service instance) {
3493    /*
3494     * No stacking of instances is allowed for a single service name
3495     */
3496    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
3497    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
3498    if (coprocessorServiceHandlers.containsKey(serviceName)) {
3499      LOG.error("Coprocessor service " + serviceName
3500        + " already registered, rejecting request from " + instance);
3501      return false;
3502    }
3503
3504    coprocessorServiceHandlers.put(serviceName, instance);
3505    if (LOG.isDebugEnabled()) {
3506      LOG.debug("Registered master coprocessor service: service=" + serviceName);
3507    }
3508    return true;
3509  }
3510
3511  /**
3512   * Utility for constructing an instance of the passed HMaster class.
3513   * @return HMaster instance.
3514   */
3515  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
3516    final Configuration conf) {
3517    try {
3518      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
3519      return c.newInstance(conf);
3520    } catch (Exception e) {
3521      Throwable error = e;
3522      if (
3523        e instanceof InvocationTargetException
3524          && ((InvocationTargetException) e).getTargetException() != null
3525      ) {
3526        error = ((InvocationTargetException) e).getTargetException();
3527      }
3528      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". ",
3529        error);
3530    }
3531  }
3532
3533  /**
3534   * @see org.apache.hadoop.hbase.master.HMasterCommandLine
3535   */
3536  public static void main(String[] args) {
3537    LOG.info("STARTING service " + HMaster.class.getSimpleName());
3538    VersionInfo.logVersion();
3539    new HMasterCommandLine(HMaster.class).doMain(args);
3540  }
3541
3542  public HFileCleaner getHFileCleaner() {
3543    return this.hfileCleaners.get(0);
3544  }
3545
3546  public List<HFileCleaner> getHFileCleaners() {
3547    return this.hfileCleaners;
3548  }
3549
3550  public LogCleaner getLogCleaner() {
3551    return this.logCleaner;
3552  }
3553
3554  /** Returns the underlying snapshot manager */
3555  @Override
3556  public SnapshotManager getSnapshotManager() {
3557    return this.snapshotManager;
3558  }
3559
3560  /** Returns the underlying MasterProcedureManagerHost */
3561  @Override
3562  public MasterProcedureManagerHost getMasterProcedureManagerHost() {
3563    return mpmHost;
3564  }
3565
3566  @Override
3567  public ClusterSchema getClusterSchema() {
3568    return this.clusterSchemaService;
3569  }
3570
3571  /**
3572   * Create a new Namespace.
3573   * @param namespaceDescriptor descriptor for new Namespace
3574   * @param nonceGroup          Identifier for the source of the request, a client or process.
3575   * @param nonce               A unique identifier for this operation from the client or process
3576   *                            identified by <code>nonceGroup</code> (the source must ensure each
3577   *                            operation gets a unique id).
3578   * @return procedure id
3579   */
3580  long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
3581    final long nonce) throws IOException {
3582    checkInitialized();
3583
3584    TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
3585
3586    return MasterProcedureUtil
3587      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3588        @Override
3589        protected void run() throws IOException {
3590          getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
3591          // We need to wait for the procedure to potentially fail due to "prepare" sanity
3592          // checks. This will block only the beginning of the procedure. See HBASE-19953.
3593          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3594          LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
3595          // Execute the operation synchronously - wait for the operation to complete before
3596          // continuing.
3597          setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch));
3598          latch.await();
3599          getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor);
3600        }
3601
3602        @Override
3603        protected String getDescription() {
3604          return "CreateNamespaceProcedure";
3605        }
3606      });
3607  }
3608
3609  /**
3610   * Modify an existing Namespace.
3611   * @param nonceGroup Identifier for the source of the request, a client or process.
3612   * @param nonce      A unique identifier for this operation from the client or process identified
3613   *                   by <code>nonceGroup</code> (the source must ensure each operation gets a
3614   *                   unique id).
3615   * @return procedure id
3616   */
3617  long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup,
3618    final long nonce) throws IOException {
3619    checkInitialized();
3620
3621    TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName()));
3622
3623    return MasterProcedureUtil
3624      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3625        @Override
3626        protected void run() throws IOException {
3627          NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName());
3628          getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor,
3629            newNsDescriptor);
3630          // We need to wait for the procedure to potentially fail due to "prepare" sanity
3631          // checks. This will block only the beginning of the procedure. See HBASE-19953.
3632          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3633          LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor);
3634          // Execute the operation synchronously - wait for the operation to complete before
3635          // continuing.
3636          setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch));
3637          latch.await();
3638          getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor,
3639            newNsDescriptor);
3640        }
3641
3642        @Override
3643        protected String getDescription() {
3644          return "ModifyNamespaceProcedure";
3645        }
3646      });
3647  }
3648
3649  /**
3650   * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
3651   * @param nonceGroup Identifier for the source of the request, a client or process.
3652   * @param nonce      A unique identifier for this operation from the client or process identified
3653   *                   by <code>nonceGroup</code> (the source must ensure each operation gets a
3654   *                   unique id).
3655   * @return procedure id
3656   */
3657  long deleteNamespace(final String name, final long nonceGroup, final long nonce)
3658    throws IOException {
3659    checkInitialized();
3660
3661    return MasterProcedureUtil
3662      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3663        @Override
3664        protected void run() throws IOException {
3665          getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
3666          LOG.info(getClientIdAuditPrefix() + " delete " + name);
3667          // Execute the operation synchronously - wait for the operation to complete before
3668          // continuing.
3669          //
3670          // We need to wait for the procedure to potentially fail due to "prepare" sanity
3671          // checks. This will block only the beginning of the procedure. See HBASE-19953.
3672          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3673          setProcId(submitProcedure(
3674            new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch)));
3675          latch.await();
3676          // Will not be invoked in the face of Exception thrown by the Procedure's execution
3677          getMaster().getMasterCoprocessorHost().postDeleteNamespace(name);
3678        }
3679
3680        @Override
3681        protected String getDescription() {
3682          return "DeleteNamespaceProcedure";
3683        }
3684      });
3685  }
3686
3687  /**
3688   * Get a Namespace
3689   * @param name Name of the Namespace
3690   * @return Namespace descriptor for <code>name</code>
3691   */
3692  NamespaceDescriptor getNamespace(String name) throws IOException {
3693    checkInitialized();
3694    if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
3695    NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
3696    if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
3697    return nsd;
3698  }
3699
3700  /**
3701   * Get all Namespaces
3702   * @return All Namespace descriptors
3703   */
3704  List<NamespaceDescriptor> getNamespaces() throws IOException {
3705    checkInitialized();
3706    final List<NamespaceDescriptor> nsds = new ArrayList<>();
3707    if (cpHost != null) {
3708      cpHost.preListNamespaceDescriptors(nsds);
3709    }
3710    nsds.addAll(this.clusterSchemaService.getNamespaces());
3711    if (this.cpHost != null) {
3712      this.cpHost.postListNamespaceDescriptors(nsds);
3713    }
3714    return nsds;
3715  }
3716
3717  /**
3718   * List namespace names
3719   * @return All namespace names
3720   */
3721  public List<String> listNamespaces() throws IOException {
3722    checkInitialized();
3723    List<String> namespaces = new ArrayList<>();
3724    if (cpHost != null) {
3725      cpHost.preListNamespaces(namespaces);
3726    }
3727    for (NamespaceDescriptor namespace : clusterSchemaService.getNamespaces()) {
3728      namespaces.add(namespace.getName());
3729    }
3730    if (cpHost != null) {
3731      cpHost.postListNamespaces(namespaces);
3732    }
3733    return namespaces;
3734  }
3735
3736  @Override
3737  public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3738    checkInitialized();
3739    return listTableNames(name, null, true);
3740  }
3741
3742  @Override
3743  public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3744    checkInitialized();
3745    return listTableDescriptors(name, null, null, true);
3746  }
3747
3748  @Override
3749  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3750    throws IOException {
3751    if (cpHost != null) {
3752      cpHost.preAbortProcedure(this.procedureExecutor, procId);
3753    }
3754
3755    final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3756
3757    if (cpHost != null) {
3758      cpHost.postAbortProcedure();
3759    }
3760
3761    return result;
3762  }
3763
3764  @Override
3765  public List<Procedure<?>> getProcedures() throws IOException {
3766    if (cpHost != null) {
3767      cpHost.preGetProcedures();
3768    }
3769
3770    @SuppressWarnings({ "unchecked", "rawtypes" })
3771    List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
3772
3773    if (cpHost != null) {
3774      cpHost.postGetProcedures(procList);
3775    }
3776
3777    return procList;
3778  }
3779
3780  @Override
3781  public List<LockedResource> getLocks() throws IOException {
3782    if (cpHost != null) {
3783      cpHost.preGetLocks();
3784    }
3785
3786    MasterProcedureScheduler procedureScheduler =
3787      procedureExecutor.getEnvironment().getProcedureScheduler();
3788
3789    final List<LockedResource> lockedResources = procedureScheduler.getLocks();
3790
3791    if (cpHost != null) {
3792      cpHost.postGetLocks(lockedResources);
3793    }
3794
3795    return lockedResources;
3796  }
3797
3798  /**
3799   * Returns the list of table descriptors that match the specified request
3800   * @param namespace        the namespace to query, or null if querying for all
3801   * @param regex            The regular expression to match against, or null if querying for all
3802   * @param tableNameList    the list of table names, or null if querying for all
3803   * @param includeSysTables False to match only against userspace tables
3804   * @return the list of table descriptors
3805   */
3806  public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex,
3807    final List<TableName> tableNameList, final boolean includeSysTables) throws IOException {
3808    List<TableDescriptor> htds = new ArrayList<>();
3809    if (cpHost != null) {
3810      cpHost.preGetTableDescriptors(tableNameList, htds, regex);
3811    }
3812    htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
3813    if (cpHost != null) {
3814      cpHost.postGetTableDescriptors(tableNameList, htds, regex);
3815    }
3816    return htds;
3817  }
3818
3819  /**
3820   * Returns the list of table names that match the specified request
3821   * @param regex            The regular expression to match against, or null if querying for all
3822   * @param namespace        the namespace to query, or null if querying for all
3823   * @param includeSysTables False to match only against userspace tables
3824   * @return the list of table names
3825   */
3826  public List<TableName> listTableNames(final String namespace, final String regex,
3827    final boolean includeSysTables) throws IOException {
3828    List<TableDescriptor> htds = new ArrayList<>();
3829    if (cpHost != null) {
3830      cpHost.preGetTableNames(htds, regex);
3831    }
3832    htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
3833    if (cpHost != null) {
3834      cpHost.postGetTableNames(htds, regex);
3835    }
3836    List<TableName> result = new ArrayList<>(htds.size());
3837    for (TableDescriptor htd : htds)
3838      result.add(htd.getTableName());
3839    return result;
3840  }
3841
3842  /**
3843   * Return a list of table table descriptors after applying any provided filter parameters. Note
3844   * that the user-facing description of this filter logic is presented on the class-level javadoc
3845   * of {@link NormalizeTableFilterParams}.
3846   */
3847  private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds,
3848    final String namespace, final String regex, final List<TableName> tableNameList,
3849    final boolean includeSysTables) throws IOException {
3850    if (tableNameList == null || tableNameList.isEmpty()) {
3851      // request for all TableDescriptors
3852      Collection<TableDescriptor> allHtds;
3853      if (namespace != null && namespace.length() > 0) {
3854        // Do a check on the namespace existence. Will fail if does not exist.
3855        this.clusterSchemaService.getNamespace(namespace);
3856        allHtds = tableDescriptors.getByNamespace(namespace).values();
3857      } else {
3858        allHtds = tableDescriptors.getAll().values();
3859      }
3860      for (TableDescriptor desc : allHtds) {
3861        if (
3862          tableStateManager.isTablePresent(desc.getTableName())
3863            && (includeSysTables || !desc.getTableName().isSystemTable())
3864        ) {
3865          htds.add(desc);
3866        }
3867      }
3868    } else {
3869      for (TableName s : tableNameList) {
3870        if (tableStateManager.isTablePresent(s)) {
3871          TableDescriptor desc = tableDescriptors.get(s);
3872          if (desc != null) {
3873            htds.add(desc);
3874          }
3875        }
3876      }
3877    }
3878
3879    // Retains only those matched by regular expression.
3880    if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
3881    return htds;
3882  }
3883
3884  /**
3885   * Removes the table descriptors that don't match the pattern.
3886   * @param descriptors list of table descriptors to filter
3887   * @param pattern     the regex to use
3888   */
3889  private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors,
3890    final Pattern pattern) {
3891    final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3892    Iterator<TableDescriptor> itr = descriptors.iterator();
3893    while (itr.hasNext()) {
3894      TableDescriptor htd = itr.next();
3895      String tableName = htd.getTableName().getNameAsString();
3896      boolean matched = pattern.matcher(tableName).matches();
3897      if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3898        matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3899      }
3900      if (!matched) {
3901        itr.remove();
3902      }
3903    }
3904  }
3905
3906  @Override
3907  public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3908    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3909      .getLastMajorCompactionTimestamp(table);
3910  }
3911
3912  @Override
3913  public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3914    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3915      .getLastMajorCompactionTimestamp(regionName);
3916  }
3917
3918  /**
3919   * Gets the mob file compaction state for a specific table. Whether all the mob files are selected
3920   * is known during the compaction execution, but the statistic is done just before compaction
3921   * starts, it is hard to know the compaction type at that time, so the rough statistics are chosen
3922   * for the mob file compaction. Only two compaction states are available,
3923   * CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3924   * @param tableName The current table name.
3925   * @return If a given table is in mob file compaction now.
3926   */
3927  public GetRegionInfoResponse.CompactionState getMobCompactionState(TableName tableName) {
3928    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3929    if (compactionsCount != null && compactionsCount.get() != 0) {
3930      return GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR;
3931    }
3932    return GetRegionInfoResponse.CompactionState.NONE;
3933  }
3934
3935  public void reportMobCompactionStart(TableName tableName) throws IOException {
3936    IdLock.Entry lockEntry = null;
3937    try {
3938      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3939      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3940      if (compactionsCount == null) {
3941        compactionsCount = new AtomicInteger(0);
3942        mobCompactionStates.put(tableName, compactionsCount);
3943      }
3944      compactionsCount.incrementAndGet();
3945    } finally {
3946      if (lockEntry != null) {
3947        mobCompactionLock.releaseLockEntry(lockEntry);
3948      }
3949    }
3950  }
3951
3952  public void reportMobCompactionEnd(TableName tableName) throws IOException {
3953    IdLock.Entry lockEntry = null;
3954    try {
3955      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3956      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3957      if (compactionsCount != null) {
3958        int count = compactionsCount.decrementAndGet();
3959        // remove the entry if the count is 0.
3960        if (count == 0) {
3961          mobCompactionStates.remove(tableName);
3962        }
3963      }
3964    } finally {
3965      if (lockEntry != null) {
3966        mobCompactionLock.releaseLockEntry(lockEntry);
3967      }
3968    }
3969  }
3970
3971  /**
3972   * Queries the state of the {@link LoadBalancerStateStore}. If the balancer is not initialized,
3973   * false is returned.
3974   * @return The state of the load balancer, or false if the load balancer isn't defined.
3975   */
3976  public boolean isBalancerOn() {
3977    return !isInMaintenanceMode() && loadBalancerStateStore != null && loadBalancerStateStore.get();
3978  }
3979
3980  /**
3981   * Queries the state of the {@link RegionNormalizerStateStore}. If it's not initialized, false is
3982   * returned.
3983   */
3984  public boolean isNormalizerOn() {
3985    return !isInMaintenanceMode() && getRegionNormalizerManager().isNormalizerOn();
3986  }
3987
3988  /**
3989   * Queries the state of the {@link SplitOrMergeStateStore}. If it is not initialized, false is
3990   * returned. If switchType is illegal, false will return.
3991   * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
3992   * @return The state of the switch
3993   */
3994  @Override
3995  public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
3996    return !isInMaintenanceMode() && splitOrMergeStateStore != null
3997      && splitOrMergeStateStore.isSplitOrMergeEnabled(switchType);
3998  }
3999
4000  /**
4001   * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
4002   * <p/>
4003   * Notice that, the base load balancer will always be {@link RSGroupBasedLoadBalancer} now, so
4004   * this method will return the balancer used inside each rs group.
4005   * @return The name of the {@link LoadBalancer} in use.
4006   */
4007  public String getLoadBalancerClassName() {
4008    return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
4009      LoadBalancerFactory.getDefaultLoadBalancerClass().getName());
4010  }
4011
4012  public SplitOrMergeStateStore getSplitOrMergeStateStore() {
4013    return splitOrMergeStateStore;
4014  }
4015
4016  @Override
4017  public RSGroupBasedLoadBalancer getLoadBalancer() {
4018    return balancer;
4019  }
4020
4021  @Override
4022  public FavoredNodesManager getFavoredNodesManager() {
4023    return balancer.getFavoredNodesManager();
4024  }
4025
4026  private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
4027    if (!isReplicationPeerModificationEnabled()) {
4028      throw new IOException("Replication peer modification disabled");
4029    }
4030    long procId = procedureExecutor.submitProcedure(procedure);
4031    procedure.getLatch().await();
4032    return procId;
4033  }
4034
4035  @Override
4036  public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
4037    throws ReplicationException, IOException {
4038    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
4039      + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
4040    return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
4041  }
4042
4043  @Override
4044  public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
4045    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
4046    return executePeerProcedure(new RemovePeerProcedure(peerId));
4047  }
4048
4049  @Override
4050  public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
4051    LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
4052    return executePeerProcedure(new EnablePeerProcedure(peerId));
4053  }
4054
4055  @Override
4056  public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
4057    LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
4058    return executePeerProcedure(new DisablePeerProcedure(peerId));
4059  }
4060
4061  @Override
4062  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
4063    throws ReplicationException, IOException {
4064    if (cpHost != null) {
4065      cpHost.preGetReplicationPeerConfig(peerId);
4066    }
4067    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
4068    ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
4069      .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
4070    if (cpHost != null) {
4071      cpHost.postGetReplicationPeerConfig(peerId);
4072    }
4073    return peerConfig;
4074  }
4075
4076  @Override
4077  public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
4078    throws ReplicationException, IOException {
4079    LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId
4080      + ", config=" + peerConfig);
4081    return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
4082  }
4083
4084  @Override
4085  public List<ReplicationPeerDescription> listReplicationPeers(String regex)
4086    throws ReplicationException, IOException {
4087    if (cpHost != null) {
4088      cpHost.preListReplicationPeers(regex);
4089    }
4090    LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
4091    Pattern pattern = regex == null ? null : Pattern.compile(regex);
4092    List<ReplicationPeerDescription> peers = this.replicationPeerManager.listPeers(pattern);
4093    if (cpHost != null) {
4094      cpHost.postListReplicationPeers(regex);
4095    }
4096    return peers;
4097  }
4098
4099  @Override
4100  public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
4101    throws ReplicationException, IOException {
4102    LOG.info(
4103      getClientIdAuditPrefix()
4104        + " transit current cluster state to {} in a synchronous replication peer id={}",
4105      state, peerId);
4106    return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
4107  }
4108
4109  @Override
4110  public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
4111    return replicationPeerModificationStateStore.set(on);
4112  }
4113
4114  @Override
4115  public boolean isReplicationPeerModificationEnabled() {
4116    return replicationPeerModificationStateStore.get();
4117  }
4118
4119  /**
4120   * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
4121   * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
4122   * @param servers Region servers to decommission.
4123   */
4124  public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
4125    throws IOException {
4126    List<ServerName> serversAdded = new ArrayList<>(servers.size());
4127    // Place the decommission marker first.
4128    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
4129    for (ServerName server : servers) {
4130      try {
4131        String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
4132        ZKUtil.createAndFailSilent(getZooKeeper(), node);
4133      } catch (KeeperException ke) {
4134        throw new HBaseIOException(
4135          this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
4136      }
4137      if (this.serverManager.addServerToDrainList(server)) {
4138        serversAdded.add(server);
4139      }
4140    }
4141    // Move the regions off the decommissioned servers.
4142    if (offload) {
4143      final List<ServerName> destServers = this.serverManager.createDestinationServersList();
4144      for (ServerName server : serversAdded) {
4145        final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
4146        for (RegionInfo hri : regionsOnServer) {
4147          ServerName dest = balancer.randomAssignment(hri, destServers);
4148          if (dest == null) {
4149            throw new HBaseIOException("Unable to determine a plan to move " + hri);
4150          }
4151          RegionPlan rp = new RegionPlan(hri, server, dest);
4152          this.assignmentManager.moveAsync(rp);
4153        }
4154      }
4155    }
4156  }
4157
4158  /**
4159   * List region servers marked as decommissioned (previously called 'draining') to not get regions
4160   * assigned to them.
4161   * @return List of decommissioned servers.
4162   */
4163  public List<ServerName> listDecommissionedRegionServers() {
4164    return this.serverManager.getDrainingServersList();
4165  }
4166
4167  /**
4168   * Remove decommission marker (previously called 'draining') from a region server to allow regions
4169   * assignments. Load regions onto the server asynchronously if a list of regions is given
4170   * @param server Region server to remove decommission marker from.
4171   */
4172  public void recommissionRegionServer(final ServerName server,
4173    final List<byte[]> encodedRegionNames) throws IOException {
4174    // Remove the server from decommissioned (draining) server list.
4175    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
4176    String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
4177    try {
4178      ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
4179    } catch (KeeperException ke) {
4180      throw new HBaseIOException(
4181        this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
4182    }
4183    this.serverManager.removeServerFromDrainList(server);
4184
4185    // Load the regions onto the server if we are given a list of regions.
4186    if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
4187      return;
4188    }
4189    if (!this.serverManager.isServerOnline(server)) {
4190      return;
4191    }
4192    for (byte[] encodedRegionName : encodedRegionNames) {
4193      RegionState regionState =
4194        assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
4195      if (regionState == null) {
4196        LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
4197        continue;
4198      }
4199      RegionInfo hri = regionState.getRegion();
4200      if (server.equals(regionState.getServerName())) {
4201        LOG.info("Skipping move of region " + hri.getRegionNameAsString()
4202          + " because region already assigned to the same server " + server + ".");
4203        continue;
4204      }
4205      RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
4206      this.assignmentManager.moveAsync(rp);
4207    }
4208  }
4209
4210  @Override
4211  public LockManager getLockManager() {
4212    return lockManager;
4213  }
4214
4215  public QuotaObserverChore getQuotaObserverChore() {
4216    return this.quotaObserverChore;
4217  }
4218
4219  public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
4220    return this.spaceQuotaSnapshotNotifier;
4221  }
4222
4223  @SuppressWarnings("unchecked")
4224  private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
4225    Procedure<?> procedure = procedureExecutor.getProcedure(procId);
4226    if (procedure == null) {
4227      return null;
4228    }
4229    assert procedure instanceof RemoteProcedure;
4230    return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
4231  }
4232
4233  public void remoteProcedureCompleted(long procId, byte[] remoteResultData) {
4234    LOG.debug("Remote procedure done, pid={}", procId);
4235    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
4236    if (procedure != null) {
4237      procedure.remoteOperationCompleted(procedureExecutor.getEnvironment(), remoteResultData);
4238    }
4239  }
4240
4241  public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
4242    LOG.debug("Remote procedure failed, pid={}", procId, error);
4243    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
4244    if (procedure != null) {
4245      procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
4246    }
4247  }
4248
4249  /**
4250   * Reopen regions provided in the argument
4251   * @param tableName   The current table name
4252   * @param regionNames The region names of the regions to reopen
4253   * @param nonceGroup  Identifier for the source of the request, a client or process
4254   * @param nonce       A unique identifier for this operation from the client or process identified
4255   *                    by <code>nonceGroup</code> (the source must ensure each operation gets a
4256   *                    unique id).
4257   * @return procedure Id
4258   * @throws IOException if reopening region fails while running procedure
4259   */
4260  long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
4261    final long nonceGroup, final long nonce) throws IOException {
4262
4263    return MasterProcedureUtil
4264      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
4265
4266        @Override
4267        protected void run() throws IOException {
4268          submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
4269        }
4270
4271        @Override
4272        protected String getDescription() {
4273          return "ReopenTableRegionsProcedure";
4274        }
4275
4276      });
4277
4278  }
4279
4280  /**
4281   * Reopen regions provided in the argument. Applies throttling to the procedure to avoid
4282   * overwhelming the system. This is used by the reopenTableRegions methods in the Admin API via
4283   * HMaster.
4284   * @param tableName   The current table name
4285   * @param regionNames The region names of the regions to reopen
4286   * @param nonceGroup  Identifier for the source of the request, a client or process
4287   * @param nonce       A unique identifier for this operation from the client or process identified
4288   *                    by <code>nonceGroup</code> (the source must ensure each operation gets a
4289   *                    unique id).
4290   * @return procedure Id
4291   * @throws IOException if reopening region fails while running procedure
4292   */
4293  long reopenRegionsThrottled(final TableName tableName, final List<byte[]> regionNames,
4294    final long nonceGroup, final long nonce) throws IOException {
4295
4296    checkInitialized();
4297
4298    if (!tableStateManager.isTablePresent(tableName)) {
4299      throw new TableNotFoundException(tableName);
4300    }
4301
4302    return MasterProcedureUtil
4303      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
4304        @Override
4305        protected void run() throws IOException {
4306          ReopenTableRegionsProcedure proc;
4307          if (regionNames.isEmpty()) {
4308            proc = ReopenTableRegionsProcedure.throttled(getConfiguration(),
4309              getTableDescriptors().get(tableName));
4310          } else {
4311            proc = ReopenTableRegionsProcedure.throttled(getConfiguration(),
4312              getTableDescriptors().get(tableName), regionNames);
4313          }
4314
4315          LOG.info("{} throttled reopening {} regions for table {}", getClientIdAuditPrefix(),
4316            regionNames.isEmpty() ? "all" : regionNames.size(), tableName);
4317
4318          submitProcedure(proc);
4319        }
4320
4321        @Override
4322        protected String getDescription() {
4323          return "Throttled ReopenTableRegionsProcedure for " + tableName;
4324        }
4325      });
4326  }
4327
4328  @Override
4329  public ReplicationPeerManager getReplicationPeerManager() {
4330    return replicationPeerManager;
4331  }
4332
4333  @Override
4334  public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
4335    return replicationLogCleanerBarrier;
4336  }
4337
4338  @Override
4339  public Semaphore getSyncReplicationPeerLock() {
4340    return syncReplicationPeerLock;
4341  }
4342
4343  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
4344    getReplicationLoad(ServerName[] serverNames) {
4345    List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
4346    if (peerList == null) {
4347      return null;
4348    }
4349    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
4350      new HashMap<>(peerList.size());
4351    peerList.stream()
4352      .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
4353    for (ServerName serverName : serverNames) {
4354      List<ReplicationLoadSource> replicationLoadSources =
4355        getServerManager().getLoad(serverName).getReplicationLoadSourceList();
4356      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
4357        List<Pair<ServerName, ReplicationLoadSource>> replicationLoadSourceList =
4358          replicationLoadSourceMap.get(replicationLoadSource.getPeerID());
4359        if (replicationLoadSourceList == null) {
4360          LOG.debug("{} does not exist, but it exists "
4361            + "in znode(/hbase/replication/rs). when the rs restarts, peerId is deleted, so "
4362            + "we just need to ignore it", replicationLoadSource.getPeerID());
4363          continue;
4364        }
4365        replicationLoadSourceList.add(new Pair<>(serverName, replicationLoadSource));
4366      }
4367    }
4368    for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
4369      if (loads.size() > 0) {
4370        loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
4371      }
4372    }
4373    return replicationLoadSourceMap;
4374  }
4375
4376  /**
4377   * This method modifies the master's configuration in order to inject replication-related features
4378   */
4379  @InterfaceAudience.Private
4380  public static void decorateMasterConfiguration(Configuration conf) {
4381    String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
4382    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
4383    if (plugins == null || !plugins.contains(cleanerClass)) {
4384      conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
4385    }
4386    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
4387      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
4388      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
4389      if (!plugins.contains(cleanerClass)) {
4390        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
4391      }
4392    }
4393  }
4394
4395  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
4396    return this.snapshotQuotaChore;
4397  }
4398
4399  public ActiveMasterManager getActiveMasterManager() {
4400    return activeMasterManager;
4401  }
4402
4403  @Override
4404  public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
4405    return this.syncReplicationReplayWALManager;
4406  }
4407
4408  @Override
4409  public HbckChore getHbckChore() {
4410    return this.hbckChore;
4411  }
4412
4413  @Override
4414  public void runReplicationBarrierCleaner() {
4415    ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
4416    if (rbc != null) {
4417      rbc.chore();
4418    }
4419  }
4420
4421  @Override
4422  public RSGroupInfoManager getRSGroupInfoManager() {
4423    return rsGroupInfoManager;
4424  }
4425
4426  /**
4427   * Get the compaction state of the table
4428   * @param tableName The table name
4429   * @return CompactionState Compaction state of the table
4430   */
4431  public CompactionState getCompactionState(final TableName tableName) {
4432    CompactionState compactionState = CompactionState.NONE;
4433    try {
4434      List<RegionInfo> regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName);
4435      for (RegionInfo regionInfo : regions) {
4436        ServerName serverName =
4437          assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo);
4438        if (serverName == null) {
4439          continue;
4440        }
4441        ServerMetrics sl = serverManager.getLoad(serverName);
4442        if (sl == null) {
4443          continue;
4444        }
4445        RegionMetrics regionMetrics = sl.getRegionMetrics().get(regionInfo.getRegionName());
4446        if (regionMetrics == null) {
4447          LOG.warn("Can not get compaction details for the region: {} , it may be not online.",
4448            regionInfo.getRegionNameAsString());
4449          continue;
4450        }
4451        if (regionMetrics.getCompactionState() == CompactionState.MAJOR) {
4452          if (compactionState == CompactionState.MINOR) {
4453            compactionState = CompactionState.MAJOR_AND_MINOR;
4454          } else {
4455            compactionState = CompactionState.MAJOR;
4456          }
4457        } else if (regionMetrics.getCompactionState() == CompactionState.MINOR) {
4458          if (compactionState == CompactionState.MAJOR) {
4459            compactionState = CompactionState.MAJOR_AND_MINOR;
4460          } else {
4461            compactionState = CompactionState.MINOR;
4462          }
4463        }
4464      }
4465    } catch (Exception e) {
4466      compactionState = null;
4467      LOG.error("Exception when get compaction state for " + tableName.getNameAsString(), e);
4468    }
4469    return compactionState;
4470  }
4471
4472  @Override
4473  public MetaLocationSyncer getMetaLocationSyncer() {
4474    return metaLocationSyncer;
4475  }
4476
4477  @RestrictedApi(explanation = "Should only be called in tests", link = "",
4478      allowedOnPath = ".*/src/test/.*")
4479  public MasterRegion getMasterRegion() {
4480    return masterRegion;
4481  }
4482
4483  @Override
4484  public void onConfigurationChange(Configuration newConf) {
4485    try {
4486      Superusers.initialize(newConf);
4487    } catch (IOException e) {
4488      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
4489    }
4490    // append the quotas observer back to the master coprocessor key
4491    setQuotasObserver(newConf);
4492    // update region server coprocessor if the configuration has changed.
4493    if (
4494      CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost, newConf,
4495        CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode
4496    ) {
4497      LOG.info("Update the master coprocessor(s) because the configuration has changed");
4498      this.cpHost = new MasterCoprocessorHost(this, newConf);
4499    }
4500  }
4501
4502  @Override
4503  protected NamedQueueRecorder createNamedQueueRecord() {
4504    final boolean isBalancerDecisionRecording =
4505      conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
4506        BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
4507    final boolean isBalancerRejectionRecording =
4508      conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
4509        BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
4510    if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
4511      return NamedQueueRecorder.getInstance(conf);
4512    } else {
4513      return null;
4514    }
4515  }
4516
4517  @Override
4518  protected boolean clusterMode() {
4519    return true;
4520  }
4521
4522  public String getClusterId() {
4523    if (activeMaster) {
4524      return clusterId;
4525    }
4526    return cachedClusterId.getFromCacheOrFetch();
4527  }
4528
4529  public Optional<ServerName> getActiveMaster() {
4530    return activeMasterManager.getActiveMasterServerName();
4531  }
4532
4533  public List<ServerName> getBackupMasters() {
4534    return activeMasterManager.getBackupMasters();
4535  }
4536
4537  @Override
4538  public Iterator<ServerName> getBootstrapNodes() {
4539    return regionServerTracker.getRegionServers().iterator();
4540  }
4541
4542  @Override
4543  public List<HRegionLocation> getMetaLocations() {
4544    return metaRegionLocationCache.getMetaRegionLocations();
4545  }
4546
4547  @Override
4548  public void flushMasterStore() throws IOException {
4549    LOG.info("Force flush master local region.");
4550    if (this.cpHost != null) {
4551      try {
4552        cpHost.preMasterStoreFlush();
4553      } catch (IOException ioe) {
4554        LOG.error("Error invoking master coprocessor preMasterStoreFlush()", ioe);
4555      }
4556    }
4557    masterRegion.flush(true);
4558    if (this.cpHost != null) {
4559      try {
4560        cpHost.postMasterStoreFlush();
4561      } catch (IOException ioe) {
4562        LOG.error("Error invoking master coprocessor postMasterStoreFlush()", ioe);
4563      }
4564    }
4565  }
4566
4567  public Collection<ServerName> getLiveRegionServers() {
4568    return regionServerTracker.getRegionServers();
4569  }
4570
4571  @RestrictedApi(explanation = "Should only be called in tests", link = "",
4572      allowedOnPath = ".*/src/test/.*")
4573  void setLoadBalancer(RSGroupBasedLoadBalancer loadBalancer) {
4574    this.balancer = loadBalancer;
4575  }
4576
4577  @RestrictedApi(explanation = "Should only be called in tests", link = "",
4578      allowedOnPath = ".*/src/test/.*")
4579  void setAssignmentManager(AssignmentManager assignmentManager) {
4580    this.assignmentManager = assignmentManager;
4581  }
4582
4583  @RestrictedApi(explanation = "Should only be called in tests", link = "",
4584      allowedOnPath = ".*/src/test/.*")
4585  static void setDisableBalancerChoreForTest(boolean disable) {
4586    disableBalancerChoreForTest = disable;
4587  }
4588
4589  private void setQuotasObserver(Configuration conf) {
4590    // Add the Observer to delete quotas on table deletion before starting all CPs by
4591    // default with quota support, avoiding if user specifically asks to not load this Observer.
4592    if (QuotaUtil.isQuotaEnabled(conf)) {
4593      updateConfigurationForQuotasObserver(conf);
4594    }
4595  }
4596
4597  @Override
4598  public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup,
4599    long nonce) throws IOException {
4600    checkInitialized();
4601
4602    if (
4603      !getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
4604        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
4605    ) {
4606      throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED");
4607    }
4608
4609    return MasterProcedureUtil
4610      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
4611        @Override
4612        protected void run() throws IOException {
4613          getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
4614          LOG.info("{} flush {}", getClientIdAuditPrefix(), tableName);
4615          submitProcedure(
4616            new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies));
4617          getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
4618        }
4619
4620        @Override
4621        protected String getDescription() {
4622          return "FlushTableProcedure";
4623        }
4624      });
4625  }
4626
4627  @Override
4628  public long rollAllWALWriters(long nonceGroup, long nonce) throws IOException {
4629    return MasterProcedureUtil
4630      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
4631        @Override
4632        protected void run() {
4633          LOG.info("{} roll all wal writers", getClientIdAuditPrefix());
4634          submitProcedure(new LogRollProcedure());
4635        }
4636
4637        @Override
4638        protected String getDescription() {
4639          return "RollAllWALWriters";
4640        }
4641      });
4642  }
4643
4644  @RestrictedApi(explanation = "Should only be called in tests", link = "",
4645      allowedOnPath = ".*/src/test/.*")
4646  public MobFileCleanerChore getMobFileCleanerChore() {
4647    return mobFileCleanerChore;
4648  }
4649
4650}