001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE; 024import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY; 025 026import com.google.errorprone.annotations.RestrictedApi; 027import com.google.protobuf.Descriptors; 028import com.google.protobuf.Service; 029import io.opentelemetry.api.trace.Span; 030import io.opentelemetry.api.trace.StatusCode; 031import io.opentelemetry.context.Scope; 032import java.io.IOException; 033import java.io.InterruptedIOException; 034import java.lang.reflect.Constructor; 035import java.lang.reflect.InvocationTargetException; 036import java.net.InetAddress; 037import java.net.InetSocketAddress; 038import java.net.UnknownHostException; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.Comparator; 044import java.util.EnumSet; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.Iterator; 048import java.util.LinkedList; 049import java.util.List; 050import java.util.Map; 051import java.util.Objects; 052import java.util.Optional; 053import java.util.Set; 054import java.util.concurrent.ExecutionException; 055import java.util.concurrent.Future; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.TimeoutException; 058import java.util.concurrent.atomic.AtomicInteger; 059import java.util.regex.Pattern; 060import java.util.stream.Collectors; 061import javax.servlet.http.HttpServlet; 062import org.apache.commons.lang3.StringUtils; 063import org.apache.hadoop.conf.Configuration; 064import org.apache.hadoop.fs.FSDataOutputStream; 065import org.apache.hadoop.fs.Path; 066import org.apache.hadoop.hbase.Cell; 067import org.apache.hadoop.hbase.CellBuilderFactory; 068import org.apache.hadoop.hbase.CellBuilderType; 069import org.apache.hadoop.hbase.ClusterId; 070import org.apache.hadoop.hbase.ClusterMetrics; 071import org.apache.hadoop.hbase.ClusterMetrics.Option; 072import org.apache.hadoop.hbase.ClusterMetricsBuilder; 073import org.apache.hadoop.hbase.DoNotRetryIOException; 074import org.apache.hadoop.hbase.HBaseIOException; 075import org.apache.hadoop.hbase.HBaseInterfaceAudience; 076import org.apache.hadoop.hbase.HConstants; 077import org.apache.hadoop.hbase.InvalidFamilyOperationException; 078import org.apache.hadoop.hbase.MasterNotRunningException; 079import org.apache.hadoop.hbase.MetaTableAccessor; 080import org.apache.hadoop.hbase.NamespaceDescriptor; 081import org.apache.hadoop.hbase.PleaseHoldException; 082import org.apache.hadoop.hbase.PleaseRestartMasterException; 083import org.apache.hadoop.hbase.RegionMetrics; 084import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 085import org.apache.hadoop.hbase.ScheduledChore; 086import org.apache.hadoop.hbase.ServerMetrics; 087import org.apache.hadoop.hbase.ServerName; 088import org.apache.hadoop.hbase.ServerTask; 089import org.apache.hadoop.hbase.ServerTaskBuilder; 090import org.apache.hadoop.hbase.TableName; 091import org.apache.hadoop.hbase.TableNotDisabledException; 092import org.apache.hadoop.hbase.TableNotFoundException; 093import org.apache.hadoop.hbase.UnknownRegionException; 094import org.apache.hadoop.hbase.client.BalanceRequest; 095import org.apache.hadoop.hbase.client.BalanceResponse; 096import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 097import org.apache.hadoop.hbase.client.CompactionState; 098import org.apache.hadoop.hbase.client.MasterSwitchType; 099import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; 100import org.apache.hadoop.hbase.client.Put; 101import org.apache.hadoop.hbase.client.RegionInfo; 102import org.apache.hadoop.hbase.client.RegionInfoBuilder; 103import org.apache.hadoop.hbase.client.RegionStatesCount; 104import org.apache.hadoop.hbase.client.ResultScanner; 105import org.apache.hadoop.hbase.client.Scan; 106import org.apache.hadoop.hbase.client.TableDescriptor; 107import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 108import org.apache.hadoop.hbase.client.TableState; 109import org.apache.hadoop.hbase.conf.ConfigurationManager; 110import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 111import org.apache.hadoop.hbase.exceptions.MasterStoppedException; 112import org.apache.hadoop.hbase.executor.ExecutorType; 113import org.apache.hadoop.hbase.favored.FavoredNodesManager; 114import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 115import org.apache.hadoop.hbase.http.HttpServer; 116import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 117import org.apache.hadoop.hbase.ipc.RpcServer; 118import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 119import org.apache.hadoop.hbase.log.HBaseMarkers; 120import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode; 121import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 122import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 123import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 124import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 125import org.apache.hadoop.hbase.master.assignment.RegionStates; 126import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 127import org.apache.hadoop.hbase.master.balancer.BalancerChore; 128import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 129import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; 130import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 131import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer; 132import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 133import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 134import org.apache.hadoop.hbase.master.cleaner.LogCleaner; 135import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; 136import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore; 137import org.apache.hadoop.hbase.master.hbck.HbckChore; 138import org.apache.hadoop.hbase.master.http.MasterDumpServlet; 139import org.apache.hadoop.hbase.master.http.MasterRedirectServlet; 140import org.apache.hadoop.hbase.master.http.MasterStatusServlet; 141import org.apache.hadoop.hbase.master.http.api_v1.ResourceConfigFactory; 142import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; 143import org.apache.hadoop.hbase.master.locking.LockManager; 144import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore; 145import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; 146import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager; 147import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 148import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure; 149import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; 150import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; 151import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; 152import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; 153import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 154import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 155import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 156import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 157import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; 158import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; 159import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 160import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 161import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 162import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 163import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; 164import org.apache.hadoop.hbase.master.region.MasterRegion; 165import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 166import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; 167import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; 168import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; 169import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; 170import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; 171import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 172import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; 173import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; 174import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 175import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; 176import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; 177import org.apache.hadoop.hbase.mob.MobFileCleanerChore; 178import org.apache.hadoop.hbase.mob.MobFileCompactionChore; 179import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; 180import org.apache.hadoop.hbase.monitoring.MonitoredTask; 181import org.apache.hadoop.hbase.monitoring.TaskMonitor; 182import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; 183import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; 184import org.apache.hadoop.hbase.procedure2.LockedResource; 185import org.apache.hadoop.hbase.procedure2.Procedure; 186import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 187import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 188import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 189import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 190import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 191import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 192import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore; 193import org.apache.hadoop.hbase.quotas.MasterQuotaManager; 194import org.apache.hadoop.hbase.quotas.MasterQuotasObserver; 195import org.apache.hadoop.hbase.quotas.QuotaObserverChore; 196import org.apache.hadoop.hbase.quotas.QuotaTableUtil; 197import org.apache.hadoop.hbase.quotas.QuotaUtil; 198import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore; 199import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 200import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 201import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier; 202import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory; 203import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 204import org.apache.hadoop.hbase.regionserver.HRegionServer; 205import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 206import org.apache.hadoop.hbase.regionserver.RSRpcServices; 207import org.apache.hadoop.hbase.regionserver.storefiletracker.ModifyColumnFamilyStoreFileTrackerProcedure; 208import org.apache.hadoop.hbase.regionserver.storefiletracker.ModifyTableStoreFileTrackerProcedure; 209import org.apache.hadoop.hbase.replication.ReplicationException; 210import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 211import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 212import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 213import org.apache.hadoop.hbase.replication.ReplicationUtils; 214import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; 215import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 216import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; 217import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 218import org.apache.hadoop.hbase.security.AccessDeniedException; 219import org.apache.hadoop.hbase.security.SecurityConstants; 220import org.apache.hadoop.hbase.security.UserProvider; 221import org.apache.hadoop.hbase.trace.TraceUtil; 222import org.apache.hadoop.hbase.util.Addressing; 223import org.apache.hadoop.hbase.util.Bytes; 224import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 225import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 226import org.apache.hadoop.hbase.util.FSTableDescriptors; 227import org.apache.hadoop.hbase.util.HBaseFsck; 228import org.apache.hadoop.hbase.util.HFileArchiveUtil; 229import org.apache.hadoop.hbase.util.IdLock; 230import org.apache.hadoop.hbase.util.ModifyRegionUtils; 231import org.apache.hadoop.hbase.util.Pair; 232import org.apache.hadoop.hbase.util.RetryCounter; 233import org.apache.hadoop.hbase.util.RetryCounterFactory; 234import org.apache.hadoop.hbase.util.TableDescriptorChecker; 235import org.apache.hadoop.hbase.util.Threads; 236import org.apache.hadoop.hbase.util.VersionInfo; 237import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker; 238import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 239import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 240import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; 241import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker; 242import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 243import org.apache.hadoop.hbase.zookeeper.ZKUtil; 244import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 245import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 246import org.apache.yetus.audience.InterfaceAudience; 247import org.apache.zookeeper.KeeperException; 248import org.slf4j.Logger; 249import org.slf4j.LoggerFactory; 250 251import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 252import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 253import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 254import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 255import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; 256import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; 257import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; 258import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext; 259import org.apache.hbase.thirdparty.org.glassfish.jersey.server.ResourceConfig; 260import org.apache.hbase.thirdparty.org.glassfish.jersey.servlet.ServletContainer; 261 262import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 264 265/** 266 * HMaster is the "master server" for HBase. An HBase cluster has one active master. If many masters 267 * are started, all compete. Whichever wins goes on to run the cluster. All others park themselves 268 * in their constructor until master or cluster shutdown or until the active master loses its lease 269 * in zookeeper. Thereafter, all running master jostle to take over master role. 270 * <p/> 271 * The Master can be asked shutdown the cluster. See {@link #shutdown()}. In this case it will tell 272 * all regionservers to go down and then wait on them all reporting in that they are down. This 273 * master will then shut itself down. 274 * <p/> 275 * You can also shutdown just this master. Call {@link #stopMaster()}. 276 * @see org.apache.zookeeper.Watcher 277 */ 278@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 279@SuppressWarnings("deprecation") 280public class HMaster extends HRegionServer implements MasterServices { 281 282 private static final Logger LOG = LoggerFactory.getLogger(HMaster.class); 283 284 // MASTER is name of the webapp and the attribute name used stuffing this 285 // instance into a web context !! AND OTHER PLACES !! 286 public static final String MASTER = "master"; 287 288 // Manager and zk listener for master election 289 private final ActiveMasterManager activeMasterManager; 290 // Region server tracker 291 private final RegionServerTracker regionServerTracker; 292 // Draining region server tracker 293 private DrainingServerTracker drainingServerTracker; 294 // Tracker for load balancer state 295 LoadBalancerTracker loadBalancerTracker; 296 // Tracker for meta location, if any client ZK quorum specified 297 private MetaLocationSyncer metaLocationSyncer; 298 // Tracker for active master location, if any client ZK quorum specified 299 @InterfaceAudience.Private 300 MasterAddressSyncer masterAddressSyncer; 301 // Tracker for auto snapshot cleanup state 302 SnapshotCleanupTracker snapshotCleanupTracker; 303 304 // Tracker for split and merge state 305 private SplitOrMergeTracker splitOrMergeTracker; 306 307 private ClusterSchemaService clusterSchemaService; 308 309 public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 310 "hbase.master.wait.on.service.seconds"; 311 public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60; 312 313 public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval"; 314 315 public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000; 316 317 // Metrics for the HMaster 318 final MetricsMaster metricsMaster; 319 // file system manager for the master FS operations 320 private MasterFileSystem fileSystemManager; 321 private MasterWalManager walManager; 322 323 // manager to manage procedure-based WAL splitting, can be null if current 324 // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager 325 // and MasterWalManager, which means zk-based WAL splitting code will be 326 // useless after we switch to the procedure-based one. our eventual goal 327 // is to remove all the zk-based WAL splitting code. 328 private SplitWALManager splitWALManager; 329 330 // server manager to deal with region server info 331 private volatile ServerManager serverManager; 332 333 // manager of assignment nodes in zookeeper 334 private AssignmentManager assignmentManager; 335 336 // manager of replication 337 private ReplicationPeerManager replicationPeerManager; 338 339 // buffer for "fatal error" notices from region servers 340 // in the cluster. This is only used for assisting 341 // operations/debugging. 342 MemoryBoundedLogMessageBuffer rsFatals; 343 344 // flag set after we become the active master (used for testing) 345 private volatile boolean activeMaster = false; 346 347 // flag set after we complete initialization once active 348 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 349 350 // flag set after master services are started, 351 // initialization may have not completed yet. 352 volatile boolean serviceStarted = false; 353 354 // Maximum time we should run balancer for 355 private final int maxBalancingTime; 356 // Maximum percent of regions in transition when balancing 357 private final double maxRitPercent; 358 359 private final LockManager lockManager = new LockManager(this); 360 361 private LoadBalancer balancer; 362 private BalancerChore balancerChore; 363 private static boolean disableBalancerChoreForTest = false; 364 private RegionNormalizerManager regionNormalizerManager; 365 private ClusterStatusChore clusterStatusChore; 366 private ClusterStatusPublisher clusterStatusPublisherChore = null; 367 private SnapshotCleanerChore snapshotCleanerChore = null; 368 369 private HbckChore hbckChore; 370 CatalogJanitor catalogJanitorChore; 371 // Threadpool for scanning the Old logs directory, used by the LogCleaner 372 private DirScanPool logCleanerPool; 373 private LogCleaner logCleaner; 374 // HFile cleaners for the custom hfile archive paths and the default archive path 375 // The archive path cleaner is the first element 376 private List<HFileCleaner> hfileCleaners = new ArrayList<>(); 377 // The hfile cleaner paths, including custom paths and the default archive path 378 private List<Path> hfileCleanerPaths = new ArrayList<>(); 379 // The shared hfile cleaner pool for the custom archive paths 380 private DirScanPool sharedHFileCleanerPool; 381 // The exclusive hfile cleaner pool for scanning the archive directory 382 private DirScanPool exclusiveHFileCleanerPool; 383 private ReplicationBarrierCleaner replicationBarrierCleaner; 384 private MobFileCleanerChore mobFileCleanerChore; 385 private MobFileCompactionChore mobFileCompactionChore; 386 private RollingUpgradeChore rollingUpgradeChore; 387 // used to synchronize the mobCompactionStates 388 private final IdLock mobCompactionLock = new IdLock(); 389 // save the information of mob compactions in tables. 390 // the key is table name, the value is the number of compactions in that table. 391 private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap(); 392 393 volatile MasterCoprocessorHost cpHost; 394 395 private final boolean preLoadTableDescriptors; 396 397 // Time stamps for when a hmaster became active 398 private long masterActiveTime; 399 400 // Time stamp for when HMaster finishes becoming Active Master 401 private long masterFinishedInitializationTime; 402 403 Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap(); 404 405 // monitor for snapshot of hbase tables 406 SnapshotManager snapshotManager; 407 // monitor for distributed procedures 408 private MasterProcedureManagerHost mpmHost; 409 410 private RegionsRecoveryChore regionsRecoveryChore = null; 411 412 private RegionsRecoveryConfigManager regionsRecoveryConfigManager = null; 413 // it is assigned after 'initialized' guard set to true, so should be volatile 414 private volatile MasterQuotaManager quotaManager; 415 private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier; 416 private QuotaObserverChore quotaObserverChore; 417 private SnapshotQuotaObserverChore snapshotQuotaChore; 418 419 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 420 private ProcedureStore procedureStore; 421 422 // the master local storage to store procedure data, meta region locations, etc. 423 private MasterRegion masterRegion; 424 425 private RegionServerList rsListStorage; 426 427 // handle table states 428 private TableStateManager tableStateManager; 429 430 /* Handle favored nodes information */ 431 private FavoredNodesManager favoredNodesManager; 432 433 /** jetty server for master to redirect requests to regionserver infoServer */ 434 private Server masterJettyServer; 435 436 // Determine if we should do normal startup or minimal "single-user" mode with no region 437 // servers and no user tables. Useful for repair and recovery of hbase:meta 438 private final boolean maintenanceMode; 439 static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode"; 440 441 // Cached clusterId on stand by masters to serve clusterID requests from clients. 442 private final CachedClusterId cachedClusterId; 443 444 public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move"; 445 private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true; 446 447 /** 448 * Initializes the HMaster. The steps are as follows: 449 * <p> 450 * <ol> 451 * <li>Initialize the local HRegionServer 452 * <li>Start the ActiveMasterManager. 453 * </ol> 454 * <p> 455 * Remaining steps of initialization occur in 456 * {@link #finishActiveMasterInitialization(MonitoredTask)} after the master becomes the active 457 * one. 458 */ 459 public HMaster(final Configuration conf) throws IOException { 460 super(conf); 461 final Span span = TraceUtil.createSpan("HMaster.cxtor"); 462 try (Scope ignored = span.makeCurrent()) { 463 if (conf.getBoolean(MAINTENANCE_MODE, false)) { 464 LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); 465 maintenanceMode = true; 466 } else if (Boolean.getBoolean(MAINTENANCE_MODE)) { 467 LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE); 468 maintenanceMode = true; 469 } else { 470 maintenanceMode = false; 471 } 472 this.rsFatals = new MemoryBoundedLogMessageBuffer( 473 conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); 474 LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(), 475 this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); 476 477 // Disable usage of meta replicas in the master 478 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 479 480 decorateMasterConfiguration(this.conf); 481 482 // Hack! Maps DFSClient => Master for logs. HDFS made this 483 // config param for task trackers, but we can piggyback off of it. 484 if (this.conf.get("mapreduce.task.attempt.id") == null) { 485 this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString()); 486 } 487 488 this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this)); 489 490 // preload table descriptor at startup 491 this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true); 492 493 this.maxBalancingTime = getMaxBalancingTime(); 494 this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT, 495 HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT); 496 497 // Do we publish the status? 498 499 boolean shouldPublish = 500 conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); 501 Class<? extends ClusterStatusPublisher.Publisher> publisherClass = 502 conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS, 503 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS, 504 ClusterStatusPublisher.Publisher.class); 505 506 if (shouldPublish) { 507 if (publisherClass == null) { 508 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " 509 + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS 510 + " is not set - not publishing status"); 511 } else { 512 clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); 513 LOG.debug("Created {}", this.clusterStatusPublisherChore); 514 getChoreService().scheduleChore(clusterStatusPublisherChore); 515 } 516 } 517 518 this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this); 519 520 cachedClusterId = new CachedClusterId(this, conf); 521 522 this.regionServerTracker = new RegionServerTracker(zooKeeper, this); 523 span.setStatus(StatusCode.OK); 524 } catch (Throwable t) { 525 // Make sure we log the exception. HMaster is often started via reflection and the 526 // cause of failed startup is lost. 527 TraceUtil.setError(span, t); 528 LOG.error("Failed construction of Master", t); 529 throw t; 530 } finally { 531 span.end(); 532 } 533 } 534 535 /** 536 * Protected to have custom implementations in tests override the default ActiveMaster 537 * implementation. 538 */ 539 protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn, 540 org.apache.hadoop.hbase.Server server) throws InterruptedIOException { 541 return new ActiveMasterManager(zk, sn, server); 542 } 543 544 @Override 545 protected String getUseThisHostnameInstead(Configuration conf) { 546 return conf.get(MASTER_HOSTNAME_KEY); 547 } 548 549 private void registerConfigurationObservers() { 550 configurationManager.registerObserver(this.rpcServices); 551 configurationManager.registerObserver(this); 552 } 553 554 // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will 555 // block in here until then. 556 @Override 557 public void run() { 558 try { 559 registerConfigurationObservers(); 560 Threads.setDaemonThreadRunning(new Thread(() -> TraceUtil.trace(() -> { 561 try { 562 int infoPort = putUpJettyServer(); 563 startActiveMasterManager(infoPort); 564 } catch (Throwable t) { 565 // Make sure we log the exception. 566 String error = "Failed to become Active Master"; 567 LOG.error(error, t); 568 // Abort should have been called already. 569 if (!isAborted()) { 570 abort(error, t); 571 } 572 } 573 }, "HMaster.becomeActiveMaster")), getName() + ":becomeActiveMaster"); 574 // Fall in here even if we have been aborted. Need to run the shutdown services and 575 // the super run call will do this for us. 576 super.run(); 577 } finally { 578 final Span span = TraceUtil.createSpan("HMaster exiting main loop"); 579 try (Scope ignored = span.makeCurrent()) { 580 if (this.clusterSchemaService != null) { 581 // If on way out, then we are no longer active master. 582 this.clusterSchemaService.stopAsync(); 583 try { 584 this.clusterSchemaService 585 .awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS, 586 DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS); 587 } catch (TimeoutException te) { 588 LOG.warn("Failed shutdown of clusterSchemaService", te); 589 } 590 } 591 this.activeMaster = false; 592 span.setStatus(StatusCode.OK); 593 } finally { 594 span.end(); 595 } 596 } 597 } 598 599 // return the actual infoPort, -1 means disable info server. 600 private int putUpJettyServer() throws IOException { 601 if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) { 602 return -1; 603 } 604 final int infoPort = 605 conf.getInt("hbase.master.info.port.orig", HConstants.DEFAULT_MASTER_INFOPORT); 606 // -1 is for disabling info server, so no redirecting 607 if (infoPort < 0 || infoServer == null) { 608 return -1; 609 } 610 if (infoPort == infoServer.getPort()) { 611 // server is already running 612 return infoPort; 613 } 614 final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 615 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 616 String msg = "Failed to start redirecting jetty server. Address " + addr 617 + " does not belong to this host. Correct configuration parameter: " 618 + "hbase.master.info.bindAddress"; 619 LOG.error(msg); 620 throw new IOException(msg); 621 } 622 623 // TODO I'm pretty sure we could just add another binding to the InfoServer run by 624 // the RegionServer and have it run the RedirectServlet instead of standing up 625 // a second entire stack here. 626 masterJettyServer = new Server(); 627 final ServerConnector connector = new ServerConnector(masterJettyServer); 628 connector.setHost(addr); 629 connector.setPort(infoPort); 630 masterJettyServer.addConnector(connector); 631 masterJettyServer.setStopAtShutdown(true); 632 masterJettyServer.setHandler(HttpServer.buildGzipHandler(masterJettyServer.getHandler())); 633 634 final String redirectHostname = 635 StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead; 636 637 final MasterRedirectServlet redirect = new MasterRedirectServlet(infoServer, redirectHostname); 638 final WebAppContext context = 639 new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS); 640 context.addServlet(new ServletHolder(redirect), "/*"); 641 context.setServer(masterJettyServer); 642 643 try { 644 masterJettyServer.start(); 645 } catch (Exception e) { 646 throw new IOException("Failed to start redirecting jetty server", e); 647 } 648 return connector.getLocalPort(); 649 } 650 651 /** 652 * For compatibility, if failed with regionserver credentials, try the master one 653 */ 654 @Override 655 protected void login(UserProvider user, String host) throws IOException { 656 try { 657 super.login(user, host); 658 } catch (IOException ie) { 659 user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE, SecurityConstants.MASTER_KRB_PRINCIPAL, 660 host); 661 } 662 } 663 664 /** 665 * If configured to put regions on active master, wait till a backup master becomes active. 666 * Otherwise, loop till the server is stopped or aborted. 667 */ 668 @Override 669 protected void waitForMasterActive() { 670 if (maintenanceMode) { 671 return; 672 } 673 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(conf); 674 while (!(tablesOnMaster && activeMaster) && !isStopped() && !isAborted()) { 675 sleeper.sleep(); 676 } 677 } 678 679 @InterfaceAudience.Private 680 public MasterRpcServices getMasterRpcServices() { 681 return (MasterRpcServices) rpcServices; 682 } 683 684 public boolean balanceSwitch(final boolean b) throws IOException { 685 return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC); 686 } 687 688 @Override 689 protected String getProcessName() { 690 return MASTER; 691 } 692 693 @Override 694 protected boolean canCreateBaseZNode() { 695 return true; 696 } 697 698 @Override 699 protected boolean canUpdateTableDescriptor() { 700 return true; 701 } 702 703 @Override 704 protected boolean cacheTableDescriptor() { 705 return true; 706 } 707 708 @Override 709 protected RSRpcServices createRpcServices() throws IOException { 710 return new MasterRpcServices(this); 711 } 712 713 @Override 714 protected void configureInfoServer() { 715 infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class); 716 infoServer.addUnprivilegedServlet("api_v1", "/api/v1/*", buildApiV1Servlet()); 717 718 infoServer.setAttribute(MASTER, this); 719 if (LoadBalancer.isTablesOnMaster(conf)) { 720 super.configureInfoServer(); 721 } 722 } 723 724 private ServletHolder buildApiV1Servlet() { 725 final ResourceConfig config = ResourceConfigFactory.createResourceConfig(conf, this); 726 return new ServletHolder(new ServletContainer(config)); 727 } 728 729 @Override 730 protected Class<? extends HttpServlet> getDumpServlet() { 731 return MasterDumpServlet.class; 732 } 733 734 @Override 735 public MetricsMaster getMasterMetrics() { 736 return metricsMaster; 737 } 738 739 /** 740 * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it 741 * should have already been initialized along with {@link ServerManager}. 742 */ 743 private void initializeZKBasedSystemTrackers() 744 throws IOException, KeeperException, ReplicationException { 745 if (maintenanceMode) { 746 // in maintenance mode, always use MaintenanceLoadBalancer. 747 conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MaintenanceLoadBalancer.class, 748 LoadBalancer.class); 749 } 750 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 751 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); 752 this.loadBalancerTracker.start(); 753 754 this.regionNormalizerManager = 755 RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this); 756 this.configurationManager.registerObserver(regionNormalizerManager); 757 this.regionNormalizerManager.start(); 758 759 this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); 760 this.splitOrMergeTracker.start(); 761 762 this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId); 763 764 this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); 765 this.drainingServerTracker.start(); 766 767 this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this); 768 this.snapshotCleanupTracker.start(); 769 770 String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); 771 boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, 772 HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE); 773 if (clientQuorumServers != null && !clientZkObserverMode) { 774 // we need to take care of the ZK information synchronization 775 // if given client ZK are not observer nodes 776 ZKWatcher clientZkWatcher = new ZKWatcher(conf, 777 getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this, 778 false, true); 779 this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this); 780 this.metaLocationSyncer.start(); 781 this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this); 782 this.masterAddressSyncer.start(); 783 // set cluster id is a one-go effort 784 ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId()); 785 } 786 787 // Set the cluster as up. If new RSs, they'll be waiting on this before 788 // going ahead with their startup. 789 boolean wasUp = this.clusterStatusTracker.isClusterUp(); 790 if (!wasUp) this.clusterStatusTracker.setClusterUp(); 791 792 LOG.info("Active/primary master=" + this.serverName + ", sessionid=0x" 793 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) 794 + ", setting cluster-up flag (Was=" + wasUp + ")"); 795 796 // create/initialize the snapshot manager and other procedure managers 797 this.snapshotManager = new SnapshotManager(); 798 this.mpmHost = new MasterProcedureManagerHost(); 799 this.mpmHost.register(this.snapshotManager); 800 this.mpmHost.register(new MasterFlushTableProcedureManager()); 801 this.mpmHost.loadProcedures(conf); 802 this.mpmHost.initialize(this, this.metricsMaster); 803 } 804 805 // Will be overriden in test to inject customized AssignmentManager 806 @InterfaceAudience.Private 807 protected AssignmentManager createAssignmentManager(MasterServices master, 808 MasterRegion masterRegion) { 809 return new AssignmentManager(master, masterRegion); 810 } 811 812 private void tryMigrateMetaLocationsFromZooKeeper() throws IOException, KeeperException { 813 // try migrate data from zookeeper 814 try (ResultScanner scanner = 815 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 816 if (scanner.next() != null) { 817 // notice that all replicas for a region are in the same row, so the migration can be 818 // done with in a one row put, which means if we have data in catalog family then we can 819 // make sure that the migration is done. 820 LOG.info("The {} family in master local region already has data in it, skip migrating...", 821 HConstants.CATALOG_FAMILY_STR); 822 return; 823 } 824 } 825 // start migrating 826 byte[] row = MetaTableAccessor.getMetaKeyForRegion(RegionInfoBuilder.FIRST_META_REGIONINFO); 827 Put put = new Put(row); 828 List<String> metaReplicaNodes = zooKeeper.getMetaReplicaNodes(); 829 StringBuilder info = new StringBuilder("Migrating meta locations:"); 830 for (String metaReplicaNode : metaReplicaNodes) { 831 int replicaId = zooKeeper.getZNodePaths().getMetaReplicaIdFromZNode(metaReplicaNode); 832 RegionState state = MetaTableLocator.getMetaRegionState(zooKeeper, replicaId); 833 info.append(" ").append(state); 834 put.setTimestamp(state.getStamp()); 835 MetaTableAccessor.addRegionInfo(put, state.getRegion()); 836 if (state.getServerName() != null) { 837 MetaTableAccessor.addLocation(put, state.getServerName(), HConstants.NO_SEQNUM, replicaId); 838 } 839 put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) 840 .setFamily(HConstants.CATALOG_FAMILY) 841 .setQualifier(RegionStateStore.getStateColumn(replicaId)).setTimestamp(put.getTimestamp()) 842 .setType(Cell.Type.Put).setValue(Bytes.toBytes(state.getState().name())).build()); 843 } 844 if (!put.isEmpty()) { 845 LOG.info(info.toString()); 846 masterRegion.update(r -> r.put(put)); 847 } else { 848 LOG.info("No meta location available on zookeeper, skip migrating..."); 849 } 850 } 851 852 /** 853 * Finish initialization of HMaster after becoming the primary master. 854 * <p/> 855 * The startup order is a bit complicated but very important, do not change it unless you know 856 * what you are doing. 857 * <ol> 858 * <li>Initialize file system based components - file system manager, wal manager, table 859 * descriptors, etc</li> 860 * <li>Publish cluster id</li> 861 * <li>Here comes the most complicated part - initialize server manager, assignment manager and 862 * region server tracker 863 * <ol type='i'> 864 * <li>Create server manager</li> 865 * <li>Create master local region</li> 866 * <li>Create procedure executor, load the procedures, but do not start workers. We will start it 867 * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same 868 * server</li> 869 * <li>Create assignment manager and start it, load the meta region state, but do not load data 870 * from meta region</li> 871 * <li>Start region server tracker, construct the online servers set and find out dead servers and 872 * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also 873 * scan the wal directory to find out possible live region servers, and the differences between 874 * these two sets are the dead servers</li> 875 * </ol> 876 * </li> 877 * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li> 878 * <li>Start necessary service threads - balancer, catalog janitor, executor services, and also 879 * the procedure executor, etc. Notice that the balancer must be created first as assignment 880 * manager may use it when assigning regions.</li> 881 * <li>Wait for meta to be initialized if necessary, start table state manager.</li> 882 * <li>Wait for enough region servers to check-in</li> 883 * <li>Let assignment manager load data from meta and construct region states</li> 884 * <li>Start all other things such as chore services, etc</li> 885 * </ol> 886 * <p/> 887 * Notice that now we will not schedule a special procedure to make meta online(unless the first 888 * time where meta has not been created yet), we will rely on SCP to bring meta online. 889 */ 890 private void finishActiveMasterInitialization(MonitoredTask status) 891 throws IOException, InterruptedException, KeeperException, ReplicationException { 892 /* 893 * We are active master now... go initialize components we need to run. 894 */ 895 status.setStatus("Initializing Master file system"); 896 897 this.masterActiveTime = EnvironmentEdgeManager.currentTime(); 898 // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. 899 900 // always initialize the MemStoreLAB as we use a region to store data in master now, see 901 // localStore. 902 initializeMemStoreChunkCreator(); 903 this.fileSystemManager = new MasterFileSystem(conf); 904 this.walManager = new MasterWalManager(this); 905 906 // warm-up HTDs cache on master initialization 907 if (preLoadTableDescriptors) { 908 status.setStatus("Pre-loading table descriptors"); 909 this.tableDescriptors.getAll(); 910 } 911 912 // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but 913 // only after it has checked in with the Master. At least a few tests ask Master for clusterId 914 // before it has called its run method and before RegionServer has done the reportForDuty. 915 ClusterId clusterId = fileSystemManager.getClusterId(); 916 status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper"); 917 ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); 918 this.clusterId = clusterId.toString(); 919 920 // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their 921 // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set 922 // hbase.write.hbck1.lock.file to false. 923 if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) { 924 Pair<Path, FSDataOutputStream> result = null; 925 try { 926 result = HBaseFsck.checkAndMarkRunningHbck(this.conf, 927 HBaseFsck.createLockRetryCounterFactory(this.conf).create()); 928 } finally { 929 if (result != null) { 930 Closeables.close(result.getSecond(), true); 931 } 932 } 933 } 934 935 status.setStatus("Initialize ServerManager and schedule SCP for crash servers"); 936 // The below two managers must be created before loading procedures, as they will be used during 937 // loading. 938 // initialize master local region 939 masterRegion = MasterRegionFactory.create(this); 940 rsListStorage = new MasterRegionServerList(masterRegion, this); 941 942 this.serverManager = createServerManager(this, rsListStorage); 943 if ( 944 !conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 945 ) { 946 this.splitWALManager = new SplitWALManager(this); 947 } 948 949 tryMigrateMetaLocationsFromZooKeeper(); 950 951 createProcedureExecutor(); 952 Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor 953 .getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass())); 954 955 // Create Assignment Manager 956 this.assignmentManager = createAssignmentManager(this, masterRegion); 957 this.assignmentManager.start(); 958 // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as 959 // completed, it could still be in the procedure list. This is a bit strange but is another 960 // story, need to verify the implementation for ProcedureExecutor and ProcedureStore. 961 List<TransitRegionStateProcedure> ritList = 962 procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream() 963 .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p) 964 .collect(Collectors.toList()); 965 this.assignmentManager.setupRIT(ritList); 966 967 // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should 968 // be registered in the deadServers set -- and with the list of servernames out on the 969 // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out). 970 // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker. 971 // TODO: Generate the splitting and live Set in one pass instead of two as we currently do. 972 this.regionServerTracker.upgrade( 973 procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream() 974 .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()), 975 Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()), 976 walManager.getSplittingServersFromWALDir()); 977 // This manager will be started AFTER hbase:meta is confirmed on line. 978 // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table 979 // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients. 980 this.tableStateManager = 981 this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true) 982 ? new MirroringTableStateManager(this) 983 : new TableStateManager(this); 984 985 status.setStatus("Initializing ZK system trackers"); 986 initializeZKBasedSystemTrackers(); 987 // Set ourselves as active Master now our claim has succeeded up in zk. 988 this.activeMaster = true; 989 990 // Start the Zombie master detector after setting master as active, see HBASE-21535 991 Thread zombieDetector = new Thread(new MasterInitializationMonitor(this), 992 "ActiveMasterInitializationMonitor-" + EnvironmentEdgeManager.currentTime()); 993 zombieDetector.setDaemon(true); 994 zombieDetector.start(); 995 996 // This is for backwards compatibility 997 // See HBASE-11393 998 status.setStatus("Update TableCFs node in ZNode"); 999 ReplicationPeerConfigUpgrader tableCFsUpdater = 1000 new ReplicationPeerConfigUpgrader(zooKeeper, conf); 1001 tableCFsUpdater.copyTableCFs(); 1002 1003 if (!maintenanceMode) { 1004 status.setStatus("Initializing master coprocessors"); 1005 setQuotasObserver(conf); 1006 initializeCoprocessorHost(conf); 1007 } 1008 1009 // Checking if meta needs initializing. 1010 status.setStatus("Initializing meta table if this is a new deploy"); 1011 InitMetaProcedure initMetaProc = null; 1012 // Print out state of hbase:meta on startup; helps debugging. 1013 if (!this.assignmentManager.getRegionStates().hasTableRegionStates(TableName.META_TABLE_NAME)) { 1014 Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream() 1015 .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny(); 1016 initMetaProc = optProc.orElseGet(() -> { 1017 // schedule an init meta procedure if meta has not been deployed yet 1018 InitMetaProcedure temp = new InitMetaProcedure(); 1019 procedureExecutor.submitProcedure(temp); 1020 return temp; 1021 }); 1022 } 1023 if (this.balancer instanceof FavoredNodesPromoter) { 1024 favoredNodesManager = new FavoredNodesManager(this); 1025 } 1026 1027 // initialize load balancer 1028 this.balancer.setMasterServices(this); 1029 this.balancer.initialize(); 1030 this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); 1031 1032 // start up all service threads. 1033 status.setStatus("Initializing master service threads"); 1034 startServiceThreads(); 1035 // wait meta to be initialized after we start procedure executor 1036 if (initMetaProc != null) { 1037 initMetaProc.await(); 1038 if (initMetaProc.isFailed() && initMetaProc.hasException()) { 1039 throw new IOException("Failed to initialize meta table", initMetaProc.getException()); 1040 } 1041 } 1042 // Wake up this server to check in 1043 sleeper.skipSleepCycle(); 1044 1045 // Wait for region servers to report in. 1046 // With this as part of master initialization, it precludes our being able to start a single 1047 // server that is both Master and RegionServer. Needs more thought. TODO. 1048 String statusStr = "Wait for region servers to report in"; 1049 status.setStatus(statusStr); 1050 LOG.info(Objects.toString(status)); 1051 waitForRegionServers(status); 1052 1053 // Check if master is shutting down because issue initializing regionservers or balancer. 1054 if (isStopped()) { 1055 return; 1056 } 1057 1058 status.setStatus("Starting assignment manager"); 1059 // FIRST HBASE:META READ!!!! 1060 // The below cannot make progress w/o hbase:meta being online. 1061 // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background 1062 // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta 1063 // if it is down. It may take a while to come online. So, wait here until meta if for sure 1064 // available. That's what waitForMetaOnline does. 1065 if (!waitForMetaOnline()) { 1066 return; 1067 } 1068 TableDescriptor metaDescriptor = tableDescriptors.get(TableName.META_TABLE_NAME); 1069 final ColumnFamilyDescriptor tableFamilyDesc = 1070 metaDescriptor.getColumnFamily(HConstants.TABLE_FAMILY); 1071 final ColumnFamilyDescriptor replBarrierFamilyDesc = 1072 metaDescriptor.getColumnFamily(HConstants.REPLICATION_BARRIER_FAMILY); 1073 1074 this.assignmentManager.joinCluster(); 1075 // The below depends on hbase:meta being online. 1076 try { 1077 this.tableStateManager.start(); 1078 } catch (NoSuchColumnFamilyException e) { 1079 if (tableFamilyDesc == null && replBarrierFamilyDesc == null) { 1080 LOG.info("TableStates manager could not be started. This is expected" 1081 + " during HBase 1 to 2 upgrade.", e); 1082 } else { 1083 throw e; 1084 } 1085 } 1086 1087 this.assignmentManager.processOfflineRegions(); 1088 // this must be called after the above processOfflineRegions to prevent race 1089 this.assignmentManager.wakeMetaLoadedEvent(); 1090 1091 // for migrating from a version without HBASE-25099, and also for honoring the configuration 1092 // first. 1093 if (conf.get(HConstants.META_REPLICAS_NUM) != null) { 1094 int replicasNumInConf = 1095 conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM); 1096 TableDescriptor metaDesc = tableDescriptors.get(TableName.META_TABLE_NAME); 1097 if (metaDesc.getRegionReplication() != replicasNumInConf) { 1098 // it is possible that we already have some replicas before upgrading, so we must set the 1099 // region replication number in meta TableDescriptor directly first, without creating a 1100 // ModifyTableProcedure, otherwise it may cause a double assign for the meta replicas. 1101 int existingReplicasCount = 1102 assignmentManager.getRegionStates().getRegionsOfTable(TableName.META_TABLE_NAME).size(); 1103 if (existingReplicasCount > metaDesc.getRegionReplication()) { 1104 LOG.info("Update replica count of hbase:meta from {}(in TableDescriptor)" 1105 + " to {}(existing ZNodes)", metaDesc.getRegionReplication(), existingReplicasCount); 1106 metaDesc = TableDescriptorBuilder.newBuilder(metaDesc) 1107 .setRegionReplication(existingReplicasCount).build(); 1108 tableDescriptors.update(metaDesc); 1109 } 1110 // check again, and issue a ModifyTableProcedure if needed 1111 if (metaDesc.getRegionReplication() != replicasNumInConf) { 1112 LOG.info( 1113 "The {} config is {} while the replica count in TableDescriptor is {}" 1114 + " for hbase:meta, altering...", 1115 HConstants.META_REPLICAS_NUM, replicasNumInConf, metaDesc.getRegionReplication()); 1116 procedureExecutor.submitProcedure(new ModifyTableProcedure( 1117 procedureExecutor.getEnvironment(), TableDescriptorBuilder.newBuilder(metaDesc) 1118 .setRegionReplication(replicasNumInConf).build(), 1119 null, metaDesc, false)); 1120 } 1121 } 1122 } 1123 // Initialize after meta is up as below scans meta 1124 if (favoredNodesManager != null && !maintenanceMode) { 1125 SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = 1126 new SnapshotOfRegionAssignmentFromMeta(getConnection()); 1127 snapshotOfRegionAssignment.initialize(); 1128 favoredNodesManager.initialize(snapshotOfRegionAssignment); 1129 } 1130 1131 // set cluster status again after user regions are assigned 1132 this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); 1133 1134 // Start balancer and meta catalog janitor after meta and regions have been assigned. 1135 status.setStatus("Starting balancer and catalog janitor"); 1136 this.clusterStatusChore = new ClusterStatusChore(this, balancer); 1137 getChoreService().scheduleChore(clusterStatusChore); 1138 this.balancerChore = new BalancerChore(this); 1139 if (!disableBalancerChoreForTest) { 1140 getChoreService().scheduleChore(balancerChore); 1141 } 1142 if (regionNormalizerManager != null) { 1143 getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore()); 1144 } 1145 this.catalogJanitorChore = new CatalogJanitor(this); 1146 getChoreService().scheduleChore(catalogJanitorChore); 1147 this.hbckChore = new HbckChore(this); 1148 getChoreService().scheduleChore(hbckChore); 1149 1150 // NAMESPACE READ!!!! 1151 // Here we expect hbase:namespace to be online. See inside initClusterSchemaService. 1152 // TODO: Fix this. Namespace is a pain being a sort-of system table. Fold it in to hbase:meta. 1153 // isNamespace does like isMeta and waits until namespace is onlined before allowing progress. 1154 if (!waitForNamespaceOnline()) { 1155 return; 1156 } 1157 status.setStatus("Starting cluster schema service"); 1158 try { 1159 initClusterSchemaService(); 1160 } catch (IllegalStateException e) { 1161 if ( 1162 e.getCause() != null && e.getCause() instanceof NoSuchColumnFamilyException 1163 && tableFamilyDesc == null && replBarrierFamilyDesc == null 1164 ) { 1165 LOG.info("ClusterSchema service could not be initialized. This is " 1166 + "expected during HBase 1 to 2 upgrade", e); 1167 } else { 1168 throw e; 1169 } 1170 } 1171 1172 if (this.cpHost != null) { 1173 try { 1174 this.cpHost.preMasterInitialization(); 1175 } catch (IOException e) { 1176 LOG.error("Coprocessor preMasterInitialization() hook failed", e); 1177 } 1178 } 1179 1180 status.markComplete("Initialization successful"); 1181 LOG.info(String.format("Master has completed initialization %.3fsec", 1182 (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f)); 1183 this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime(); 1184 configurationManager.registerObserver(this.balancer); 1185 configurationManager.registerObserver(this.logCleanerPool); 1186 configurationManager.registerObserver(this.logCleaner); 1187 configurationManager.registerObserver(this.regionsRecoveryConfigManager); 1188 configurationManager.registerObserver(this.exclusiveHFileCleanerPool); 1189 if (this.sharedHFileCleanerPool != null) { 1190 configurationManager.registerObserver(this.sharedHFileCleanerPool); 1191 } 1192 if (this.hfileCleaners != null) { 1193 for (HFileCleaner cleaner : hfileCleaners) { 1194 configurationManager.registerObserver(cleaner); 1195 } 1196 } 1197 // Set master as 'initialized'. 1198 setInitialized(true); 1199 1200 if (tableFamilyDesc == null && replBarrierFamilyDesc == null) { 1201 // create missing CFs in meta table after master is set to 'initialized'. 1202 createMissingCFsInMetaDuringUpgrade(metaDescriptor); 1203 1204 // Throwing this Exception to abort active master is painful but this 1205 // seems the only way to add missing CFs in meta while upgrading from 1206 // HBase 1 to 2 (where HBase 2 has HBASE-23055 & HBASE-23782 checked-in). 1207 // So, why do we abort active master after adding missing CFs in meta? 1208 // When we reach here, we would have already bypassed NoSuchColumnFamilyException 1209 // in initClusterSchemaService(), meaning ClusterSchemaService is not 1210 // correctly initialized but we bypassed it. Similarly, we bypassed 1211 // tableStateManager.start() as well. Hence, we should better abort 1212 // current active master because our main task - adding missing CFs 1213 // in meta table is done (possible only after master state is set as 1214 // initialized) at the expense of bypassing few important tasks as part 1215 // of active master init routine. So now we abort active master so that 1216 // next active master init will not face any issues and all mandatory 1217 // services will be started during master init phase. 1218 throw new PleaseRestartMasterException("Aborting active master after missing" 1219 + " CFs are successfully added in meta. Subsequent active master " 1220 + "initialization should be uninterrupted"); 1221 } 1222 1223 if (maintenanceMode) { 1224 LOG.info("Detected repair mode, skipping final initialization steps."); 1225 return; 1226 } 1227 1228 assignmentManager.checkIfShouldMoveSystemRegionAsync(); 1229 status.setStatus("Starting quota manager"); 1230 initQuotaManager(); 1231 if (QuotaUtil.isQuotaEnabled(conf)) { 1232 // Create the quota snapshot notifier 1233 spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier(); 1234 spaceQuotaSnapshotNotifier.initialize(getClusterConnection()); 1235 this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics()); 1236 // Start the chore to read the region FS space reports and act on them 1237 getChoreService().scheduleChore(quotaObserverChore); 1238 1239 this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics()); 1240 // Start the chore to read snapshots and add their usage to table/NS quotas 1241 getChoreService().scheduleChore(snapshotQuotaChore); 1242 } 1243 final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this); 1244 slowLogMasterService.init(); 1245 1246 // clear the dead servers with same host name and port of online server because we are not 1247 // removing dead server with same hostname and port of rs which is trying to check in before 1248 // master initialization. See HBASE-5916. 1249 this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer(); 1250 1251 // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration 1252 status.setStatus("Checking ZNode ACLs"); 1253 zooKeeper.checkAndSetZNodeAcls(); 1254 1255 status.setStatus("Initializing MOB Cleaner"); 1256 initMobCleaner(); 1257 1258 status.setStatus("Calling postStartMaster coprocessors"); 1259 if (this.cpHost != null) { 1260 // don't let cp initialization errors kill the master 1261 try { 1262 this.cpHost.postStartMaster(); 1263 } catch (IOException ioe) { 1264 LOG.error("Coprocessor postStartMaster() hook failed", ioe); 1265 } 1266 } 1267 1268 zombieDetector.interrupt(); 1269 1270 /* 1271 * After master has started up, lets do balancer post startup initialization. Since this runs in 1272 * activeMasterManager thread, it should be fine. 1273 */ 1274 long start = EnvironmentEdgeManager.currentTime(); 1275 this.balancer.postMasterStartupInitialize(); 1276 if (LOG.isDebugEnabled()) { 1277 LOG.debug("Balancer post startup initialization complete, took " 1278 + ((EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds"); 1279 } 1280 1281 this.rollingUpgradeChore = new RollingUpgradeChore(this); 1282 getChoreService().scheduleChore(rollingUpgradeChore); 1283 } 1284 1285 private void createMissingCFsInMetaDuringUpgrade(TableDescriptor metaDescriptor) 1286 throws IOException { 1287 TableDescriptor newMetaDesc = TableDescriptorBuilder.newBuilder(metaDescriptor) 1288 .setColumnFamily(FSTableDescriptors.getTableFamilyDescForMeta(conf)) 1289 .setColumnFamily(FSTableDescriptors.getReplBarrierFamilyDescForMeta()).build(); 1290 long pid = this.modifyTable(TableName.META_TABLE_NAME, () -> newMetaDesc, 0, 0, false); 1291 int tries = 30; 1292 while ( 1293 !(getMasterProcedureExecutor().isFinished(pid)) && getMasterProcedureExecutor().isRunning() 1294 && tries > 0 1295 ) { 1296 try { 1297 Thread.sleep(1000); 1298 } catch (InterruptedException e) { 1299 throw new IOException("Wait interrupted", e); 1300 } 1301 tries--; 1302 } 1303 if (tries <= 0) { 1304 throw new HBaseIOException( 1305 "Failed to add table and rep_barrier CFs to meta in a given time."); 1306 } else { 1307 Procedure<?> result = getMasterProcedureExecutor().getResult(pid); 1308 if (result != null && result.isFailed()) { 1309 throw new IOException("Failed to add table and rep_barrier CFs to meta. " 1310 + MasterProcedureUtil.unwrapRemoteIOException(result)); 1311 } 1312 } 1313 } 1314 1315 /** 1316 * Check hbase:meta is up and ready for reading. For use during Master startup only. 1317 * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online 1318 * and we will hold here until operator intervention. 1319 */ 1320 @InterfaceAudience.Private 1321 public boolean waitForMetaOnline() { 1322 return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO); 1323 } 1324 1325 /** 1326 * @return True if region is online and scannable else false if an error or shutdown (Otherwise we 1327 * just block in here holding up all forward-progess). 1328 */ 1329 private boolean isRegionOnline(RegionInfo ri) { 1330 RetryCounter rc = null; 1331 while (!isStopped()) { 1332 RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri); 1333 if (rs.isOpened()) { 1334 if (this.getServerManager().isServerOnline(rs.getServerName())) { 1335 return true; 1336 } 1337 } 1338 // Region is not OPEN. 1339 Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures() 1340 .stream().filter(p -> p instanceof ServerCrashProcedure).findAny(); 1341 // TODO: Add a page to refguide on how to do repair. Have this log message point to it. 1342 // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and 1343 // then how to assign including how to break region lock if one held. 1344 LOG.warn( 1345 "{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot " 1346 + "progress, in holding-pattern until region onlined.", 1347 ri.getRegionNameAsString(), rs, optProc.isPresent()); 1348 // Check once-a-minute. 1349 if (rc == null) { 1350 rc = new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60_000).create(); 1351 } 1352 Threads.sleep(rc.getBackoffTimeAndIncrementAttempts()); 1353 } 1354 return false; 1355 } 1356 1357 /** 1358 * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table 1359 * (TODO: Fix this! NS should not hold-up startup). 1360 * @return True if namespace table is up/online. 1361 */ 1362 @InterfaceAudience.Private 1363 public boolean waitForNamespaceOnline() { 1364 List<RegionInfo> ris = 1365 this.assignmentManager.getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME); 1366 if (ris.isEmpty()) { 1367 // If empty, means we've not assigned the namespace table yet... Just return true so startup 1368 // continues and the namespace table gets created. 1369 return true; 1370 } 1371 // Else there are namespace regions up in meta. Ensure they are assigned before we go on. 1372 for (RegionInfo ri : ris) { 1373 if (!isRegionOnline(ri)) { 1374 return false; 1375 } 1376 } 1377 return true; 1378 } 1379 1380 /** 1381 * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to 1382 * automatically remove quotas for a table when that table is deleted. 1383 */ 1384 @InterfaceAudience.Private 1385 public void updateConfigurationForQuotasObserver(Configuration conf) { 1386 // We're configured to not delete quotas on table deletion, so we don't need to add the obs. 1387 if ( 1388 !conf.getBoolean(MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE, 1389 MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT) 1390 ) { 1391 return; 1392 } 1393 String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY); 1394 final int length = null == masterCoprocs ? 0 : masterCoprocs.length; 1395 String[] updatedCoprocs = new String[length + 1]; 1396 if (length > 0) { 1397 System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length); 1398 } 1399 updatedCoprocs[length] = MasterQuotasObserver.class.getName(); 1400 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs); 1401 } 1402 1403 private void initMobCleaner() { 1404 this.mobFileCleanerChore = new MobFileCleanerChore(this); 1405 getChoreService().scheduleChore(mobFileCleanerChore); 1406 this.mobFileCompactionChore = new MobFileCompactionChore(this); 1407 getChoreService().scheduleChore(mobFileCompactionChore); 1408 } 1409 1410 /** 1411 * <p> 1412 * Create a {@link ServerManager} instance. 1413 * </p> 1414 * <p> 1415 * Will be overridden in tests. 1416 * </p> 1417 */ 1418 @InterfaceAudience.Private 1419 protected ServerManager createServerManager(MasterServices master, RegionServerList storage) 1420 throws IOException { 1421 // We put this out here in a method so can do a Mockito.spy and stub it out 1422 // w/ a mocked up ServerManager. 1423 setupClusterConnection(); 1424 return new ServerManager(master, storage); 1425 } 1426 1427 private void waitForRegionServers(final MonitoredTask status) 1428 throws IOException, InterruptedException { 1429 this.serverManager.waitForRegionServers(status); 1430 } 1431 1432 // Will be overridden in tests 1433 @InterfaceAudience.Private 1434 protected void initClusterSchemaService() throws IOException, InterruptedException { 1435 this.clusterSchemaService = new ClusterSchemaServiceImpl(this); 1436 this.clusterSchemaService.startAsync(); 1437 try { 1438 this.clusterSchemaService 1439 .awaitRunning(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS, 1440 DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS); 1441 } catch (TimeoutException toe) { 1442 throw new IOException("Timedout starting ClusterSchemaService", toe); 1443 } 1444 } 1445 1446 private void initQuotaManager() throws IOException { 1447 MasterQuotaManager quotaManager = new MasterQuotaManager(this); 1448 quotaManager.start(); 1449 this.quotaManager = quotaManager; 1450 } 1451 1452 private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() { 1453 SpaceQuotaSnapshotNotifier notifier = 1454 SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration()); 1455 return notifier; 1456 } 1457 1458 public boolean isCatalogJanitorEnabled() { 1459 return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false; 1460 } 1461 1462 boolean isCleanerChoreEnabled() { 1463 boolean hfileCleanerFlag = true, logCleanerFlag = true; 1464 1465 if (getHFileCleaner() != null) { 1466 hfileCleanerFlag = getHFileCleaner().getEnabled(); 1467 } 1468 1469 if (logCleaner != null) { 1470 logCleanerFlag = logCleaner.getEnabled(); 1471 } 1472 1473 return (hfileCleanerFlag && logCleanerFlag); 1474 } 1475 1476 @Override 1477 public ServerManager getServerManager() { 1478 return this.serverManager; 1479 } 1480 1481 @Override 1482 public MasterFileSystem getMasterFileSystem() { 1483 return this.fileSystemManager; 1484 } 1485 1486 @Override 1487 public MasterWalManager getMasterWalManager() { 1488 return this.walManager; 1489 } 1490 1491 @Override 1492 public SplitWALManager getSplitWALManager() { 1493 return splitWALManager; 1494 } 1495 1496 @Override 1497 public TableStateManager getTableStateManager() { 1498 return tableStateManager; 1499 } 1500 1501 /* 1502 * Start up all services. If any of these threads gets an unhandled exception then they just die 1503 * with a logged message. This should be fine because in general, we do not expect the master to 1504 * get such unhandled exceptions as OOMEs; it should be lightly loaded. See what HRegionServer 1505 * does if need to install an unexpected exception handler. 1506 */ 1507 private void startServiceThreads() throws IOException { 1508 // Start the executor service pools 1509 final int masterOpenRegionPoolSize = conf.getInt(HConstants.MASTER_OPEN_REGION_THREADS, 1510 HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT); 1511 executorService.startExecutorService(executorService.new ExecutorConfig() 1512 .setExecutorType(ExecutorType.MASTER_OPEN_REGION).setCorePoolSize(masterOpenRegionPoolSize)); 1513 final int masterCloseRegionPoolSize = conf.getInt(HConstants.MASTER_CLOSE_REGION_THREADS, 1514 HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT); 1515 executorService.startExecutorService( 1516 executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_CLOSE_REGION) 1517 .setCorePoolSize(masterCloseRegionPoolSize)); 1518 final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, 1519 HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT); 1520 executorService.startExecutorService( 1521 executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_SERVER_OPERATIONS) 1522 .setCorePoolSize(masterServerOpThreads)); 1523 final int masterServerMetaOpsThreads = 1524 conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, 1525 HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT); 1526 executorService.startExecutorService(executorService.new ExecutorConfig() 1527 .setExecutorType(ExecutorType.MASTER_META_SERVER_OPERATIONS) 1528 .setCorePoolSize(masterServerMetaOpsThreads)); 1529 final int masterLogReplayThreads = conf.getInt(HConstants.MASTER_LOG_REPLAY_OPS_THREADS, 1530 HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT); 1531 executorService.startExecutorService(executorService.new ExecutorConfig() 1532 .setExecutorType(ExecutorType.M_LOG_REPLAY_OPS).setCorePoolSize(masterLogReplayThreads)); 1533 final int masterSnapshotThreads = conf.getInt(SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, 1534 SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT); 1535 executorService.startExecutorService( 1536 executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) 1537 .setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true)); 1538 final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS, 1539 HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT); 1540 executorService.startExecutorService( 1541 executorService.new ExecutorConfig().setExecutorType(ExecutorType.MASTER_MERGE_OPERATIONS) 1542 .setCorePoolSize(masterMergeDispatchThreads).setAllowCoreThreadTimeout(true)); 1543 1544 // We depend on there being only one instance of this executor running 1545 // at a time. To do concurrency, would need fencing of enable/disable of 1546 // tables. 1547 // Any time changing this maxThreads to > 1, pls see the comment at 1548 // AccessController#postCompletedCreateTableAction 1549 executorService.startExecutorService(executorService.new ExecutorConfig() 1550 .setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1)); 1551 startProcedureExecutor(); 1552 1553 // Create log cleaner thread pool 1554 logCleanerPool = DirScanPool.getLogCleanerScanPool(conf); 1555 Map<String, Object> params = new HashMap<>(); 1556 params.put(MASTER, this); 1557 // Start log cleaner thread 1558 int cleanerInterval = 1559 conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); 1560 this.logCleaner = 1561 new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(), 1562 getMasterWalManager().getOldLogDir(), logCleanerPool, params); 1563 getChoreService().scheduleChore(logCleaner); 1564 1565 Path archiveDir = HFileArchiveUtil.getArchivePath(conf); 1566 1567 // Create custom archive hfile cleaners 1568 String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS); 1569 // todo: handle the overlap issues for the custom paths 1570 1571 if (paths != null && paths.length > 0) { 1572 if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) { 1573 Set<String> cleanerClasses = new HashSet<>(); 1574 String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); 1575 if (cleaners != null) { 1576 Collections.addAll(cleanerClasses, cleaners); 1577 } 1578 conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, 1579 cleanerClasses.toArray(new String[cleanerClasses.size()])); 1580 LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths), 1581 cleanerClasses); 1582 } 1583 // share the hfile cleaner pool in custom paths 1584 sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6")); 1585 for (int i = 0; i < paths.length; i++) { 1586 Path path = new Path(paths[i].trim()); 1587 HFileCleaner cleaner = 1588 new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this, 1589 conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path), 1590 HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null); 1591 hfileCleaners.add(cleaner); 1592 hfileCleanerPaths.add(path); 1593 } 1594 } 1595 1596 // Create the whole archive dir cleaner thread pool 1597 exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); 1598 hfileCleaners.add(0, 1599 new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), 1600 archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths)); 1601 hfileCleanerPaths.add(0, archiveDir); 1602 // Schedule all the hfile cleaners 1603 for (HFileCleaner hFileCleaner : hfileCleaners) { 1604 getChoreService().scheduleChore(hFileCleaner); 1605 } 1606 1607 // Regions Reopen based on very high storeFileRefCount is considered enabled 1608 // only if hbase.regions.recovery.store.file.ref.count has value > 0 1609 final int maxStoreFileRefCount = conf.getInt(HConstants.STORE_FILE_REF_COUNT_THRESHOLD, 1610 HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD); 1611 if (maxStoreFileRefCount > 0) { 1612 this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this); 1613 getChoreService().scheduleChore(this.regionsRecoveryChore); 1614 } else { 1615 LOG.info( 1616 "Reopening regions with very high storeFileRefCount is disabled. " 1617 + "Provide threshold value > 0 for {} to enable it.", 1618 HConstants.STORE_FILE_REF_COUNT_THRESHOLD); 1619 } 1620 1621 this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this); 1622 1623 replicationBarrierCleaner = 1624 new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager); 1625 getChoreService().scheduleChore(replicationBarrierCleaner); 1626 1627 final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker.isSnapshotCleanupEnabled(); 1628 this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager()); 1629 if (isSnapshotChoreEnabled) { 1630 getChoreService().scheduleChore(this.snapshotCleanerChore); 1631 } else { 1632 if (LOG.isTraceEnabled()) { 1633 LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore.."); 1634 } 1635 } 1636 serviceStarted = true; 1637 if (LOG.isTraceEnabled()) { 1638 LOG.trace("Started service threads"); 1639 } 1640 } 1641 1642 @Override 1643 protected void stopServiceThreads() { 1644 if (masterJettyServer != null) { 1645 LOG.info("Stopping master jetty server"); 1646 try { 1647 masterJettyServer.stop(); 1648 } catch (Exception e) { 1649 LOG.error("Failed to stop master jetty server", e); 1650 } 1651 } 1652 stopChores(); 1653 1654 super.stopServiceThreads(); 1655 if (exclusiveHFileCleanerPool != null) { 1656 exclusiveHFileCleanerPool.shutdownNow(); 1657 exclusiveHFileCleanerPool = null; 1658 } 1659 if (logCleanerPool != null) { 1660 logCleanerPool.shutdownNow(); 1661 logCleanerPool = null; 1662 } 1663 if (sharedHFileCleanerPool != null) { 1664 sharedHFileCleanerPool.shutdownNow(); 1665 sharedHFileCleanerPool = null; 1666 } 1667 1668 LOG.debug("Stopping service threads"); 1669 1670 // stop procedure executor prior to other services such as server manager and assignment 1671 // manager, as these services are important for some running procedures. See HBASE-24117 for 1672 // example. 1673 stopProcedureExecutor(); 1674 1675 if (regionNormalizerManager != null) { 1676 regionNormalizerManager.stop(); 1677 } 1678 if (this.quotaManager != null) { 1679 this.quotaManager.stop(); 1680 } 1681 1682 if (this.activeMasterManager != null) { 1683 this.activeMasterManager.stop(); 1684 } 1685 if (this.serverManager != null) { 1686 this.serverManager.stop(); 1687 } 1688 if (this.assignmentManager != null) { 1689 this.assignmentManager.stop(); 1690 } 1691 1692 if (masterRegion != null) { 1693 masterRegion.close(isAborted()); 1694 } 1695 if (this.walManager != null) { 1696 this.walManager.stop(); 1697 } 1698 if (this.fileSystemManager != null) { 1699 this.fileSystemManager.stop(); 1700 } 1701 if (this.mpmHost != null) { 1702 this.mpmHost.stop("server shutting down."); 1703 } 1704 if (this.regionServerTracker != null) { 1705 this.regionServerTracker.stop(); 1706 } 1707 } 1708 1709 private void createProcedureExecutor() throws IOException { 1710 MasterProcedureEnv procEnv = new MasterProcedureEnv(this); 1711 procedureStore = new RegionProcedureStore(this, masterRegion, 1712 new MasterProcedureEnv.FsUtilsLeaseRecovery(this)); 1713 procedureStore.registerListener(new ProcedureStoreListener() { 1714 1715 @Override 1716 public void abortProcess() { 1717 abort("The Procedure Store lost the lease", null); 1718 } 1719 }); 1720 MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); 1721 procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); 1722 configurationManager.registerObserver(procEnv); 1723 1724 int cpus = Runtime.getRuntime().availableProcessors(); 1725 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max( 1726 (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 1727 final boolean abortOnCorruption = 1728 conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 1729 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 1730 procedureStore.start(numThreads); 1731 // Just initialize it but do not start the workers, we will start the workers later by calling 1732 // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more 1733 // details. 1734 procedureExecutor.init(numThreads, abortOnCorruption); 1735 if (!procEnv.getRemoteDispatcher().start()) { 1736 throw new HBaseIOException("Failed start of remote dispatcher"); 1737 } 1738 } 1739 1740 // will be override in UT 1741 protected void startProcedureExecutor() throws IOException { 1742 procedureExecutor.startWorkers(); 1743 } 1744 1745 /** 1746 * Turn on/off Snapshot Cleanup Chore 1747 * @param on indicates whether Snapshot Cleanup Chore is to be run 1748 */ 1749 void switchSnapshotCleanup(final boolean on, final boolean synchronous) { 1750 if (synchronous) { 1751 synchronized (this.snapshotCleanerChore) { 1752 switchSnapshotCleanup(on); 1753 } 1754 } else { 1755 switchSnapshotCleanup(on); 1756 } 1757 } 1758 1759 private void switchSnapshotCleanup(final boolean on) { 1760 try { 1761 snapshotCleanupTracker.setSnapshotCleanupEnabled(on); 1762 if (on) { 1763 getChoreService().scheduleChore(this.snapshotCleanerChore); 1764 } else { 1765 this.snapshotCleanerChore.cancel(); 1766 } 1767 } catch (KeeperException e) { 1768 LOG.error("Error updating snapshot cleanup mode to {}", on, e); 1769 } 1770 } 1771 1772 private void stopProcedureExecutor() { 1773 if (procedureExecutor != null) { 1774 configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); 1775 procedureExecutor.getEnvironment().getRemoteDispatcher().stop(); 1776 procedureExecutor.stop(); 1777 procedureExecutor.join(); 1778 procedureExecutor = null; 1779 } 1780 1781 if (procedureStore != null) { 1782 procedureStore.stop(isAborted()); 1783 procedureStore = null; 1784 } 1785 } 1786 1787 private void stopChores() { 1788 if (getChoreService() != null) { 1789 shutdownChore(mobFileCleanerChore); 1790 shutdownChore(mobFileCompactionChore); 1791 shutdownChore(balancerChore); 1792 if (regionNormalizerManager != null) { 1793 shutdownChore(regionNormalizerManager.getRegionNormalizerChore()); 1794 } 1795 shutdownChore(clusterStatusChore); 1796 shutdownChore(catalogJanitorChore); 1797 shutdownChore(clusterStatusPublisherChore); 1798 shutdownChore(snapshotQuotaChore); 1799 shutdownChore(logCleaner); 1800 if (hfileCleaners != null) { 1801 for (ScheduledChore chore : hfileCleaners) { 1802 chore.shutdown(); 1803 } 1804 hfileCleaners = null; 1805 } 1806 shutdownChore(replicationBarrierCleaner); 1807 shutdownChore(snapshotCleanerChore); 1808 shutdownChore(hbckChore); 1809 shutdownChore(regionsRecoveryChore); 1810 shutdownChore(rollingUpgradeChore); 1811 } 1812 } 1813 1814 /** Returns Get remote side's InetAddress */ 1815 InetAddress getRemoteInetAddress(final int port, final long serverStartCode) 1816 throws UnknownHostException { 1817 // Do it out here in its own little method so can fake an address when 1818 // mocking up in tests. 1819 InetAddress ia = RpcServer.getRemoteIp(); 1820 1821 // The call could be from the local regionserver, 1822 // in which case, there is no remote address. 1823 if (ia == null && serverStartCode == startcode) { 1824 InetSocketAddress isa = rpcServices.getSocketAddress(); 1825 if (isa != null && isa.getPort() == port) { 1826 ia = isa.getAddress(); 1827 } 1828 } 1829 return ia; 1830 } 1831 1832 /** Returns Maximum time we should run balancer for */ 1833 private int getMaxBalancingTime() { 1834 // if max balancing time isn't set, defaulting it to period time 1835 int maxBalancingTime = 1836 getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING, getConfiguration() 1837 .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD)); 1838 return maxBalancingTime; 1839 } 1840 1841 /** Returns Maximum number of regions in transition */ 1842 private int getMaxRegionsInTransition() { 1843 int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size(); 1844 return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1); 1845 } 1846 1847 /** 1848 * It first sleep to the next balance plan start time. Meanwhile, throttling by the max number 1849 * regions in transition to protect availability. 1850 * @param nextBalanceStartTime The next balance plan start time 1851 * @param maxRegionsInTransition max number of regions in transition 1852 * @param cutoffTime when to exit balancer 1853 */ 1854 private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition, 1855 long cutoffTime) { 1856 boolean interrupted = false; 1857 1858 // Sleep to next balance plan start time 1859 // But if there are zero regions in transition, it can skip sleep to speed up. 1860 while ( 1861 !interrupted && EnvironmentEdgeManager.currentTime() < nextBalanceStartTime 1862 && this.assignmentManager.getRegionStates().hasRegionsInTransition() 1863 ) { 1864 try { 1865 Thread.sleep(100); 1866 } catch (InterruptedException ie) { 1867 interrupted = true; 1868 } 1869 } 1870 1871 // Throttling by max number regions in transition 1872 while ( 1873 !interrupted && maxRegionsInTransition > 0 1874 && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() 1875 >= maxRegionsInTransition 1876 && EnvironmentEdgeManager.currentTime() <= cutoffTime 1877 ) { 1878 try { 1879 // sleep if the number of regions in transition exceeds the limit 1880 Thread.sleep(100); 1881 } catch (InterruptedException ie) { 1882 interrupted = true; 1883 } 1884 } 1885 1886 if (interrupted) Thread.currentThread().interrupt(); 1887 } 1888 1889 public BalanceResponse balance() throws IOException { 1890 return balance(BalanceRequest.defaultInstance()); 1891 } 1892 1893 /** 1894 * Trigger a normal balance, see {@link HMaster#balance()} . If the balance is not executed this 1895 * time, the metrics related to the balance will be updated. When balance is running, related 1896 * metrics will be updated at the same time. But if some checking logic failed and cause the 1897 * balancer exit early, we lost the chance to update balancer metrics. This will lead to user 1898 * missing the latest balancer info. 1899 */ 1900 public BalanceResponse balanceOrUpdateMetrics() throws IOException { 1901 synchronized (this.balancer) { 1902 BalanceResponse response = balance(); 1903 if (!response.isBalancerRan()) { 1904 Map<TableName, Map<ServerName, List<RegionInfo>>> assignments = 1905 this.assignmentManager.getRegionStates().getAssignmentsForBalancer(this.tableStateManager, 1906 this.serverManager.getOnlineServersList()); 1907 for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) { 1908 serverMap.keySet().removeAll(this.serverManager.getDrainingServersList()); 1909 } 1910 this.balancer.updateBalancerLoadInfo(assignments); 1911 } 1912 return response; 1913 } 1914 } 1915 1916 /** 1917 * Checks master state before initiating action over region topology. 1918 * @param action the name of the action under consideration, for logging. 1919 * @return {@code true} when the caller should exit early, {@code false} otherwise. 1920 */ 1921 @Override 1922 public boolean skipRegionManagementAction(final String action) { 1923 // Note: this method could be `default` on MasterServices if but for logging. 1924 if (!isInitialized()) { 1925 LOG.debug("Master has not been initialized, don't run {}.", action); 1926 return true; 1927 } 1928 if (this.getServerManager().isClusterShutdown()) { 1929 LOG.info("Cluster is shutting down, don't run {}.", action); 1930 return true; 1931 } 1932 if (isInMaintenanceMode()) { 1933 LOG.info("Master is in maintenance mode, don't run {}.", action); 1934 return true; 1935 } 1936 return false; 1937 } 1938 1939 public BalanceResponse balance(BalanceRequest request) throws IOException { 1940 checkInitialized(); 1941 1942 BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder(); 1943 1944 if ( 1945 loadBalancerTracker == null || !(loadBalancerTracker.isBalancerOn() || request.isDryRun()) 1946 ) { 1947 return responseBuilder.build(); 1948 } 1949 1950 if (skipRegionManagementAction("balancer")) { 1951 return responseBuilder.build(); 1952 } 1953 1954 synchronized (this.balancer) { 1955 // Only allow one balance run at at time. 1956 if (this.assignmentManager.hasRegionsInTransition()) { 1957 List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition(); 1958 // if hbase:meta region is in transition, result of assignment cannot be recorded 1959 // ignore the force flag in that case 1960 boolean metaInTransition = assignmentManager.isMetaRegionInTransition(); 1961 List<RegionStateNode> toPrint = regionsInTransition; 1962 int max = 5; 1963 boolean truncated = false; 1964 if (regionsInTransition.size() > max) { 1965 toPrint = regionsInTransition.subList(0, max); 1966 truncated = true; 1967 } 1968 1969 if (!request.isIgnoreRegionsInTransition() || metaInTransition) { 1970 LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" + metaInTransition 1971 + ") because " + regionsInTransition.size() + " region(s) in transition: " + toPrint 1972 + (truncated ? "(truncated list)" : "")); 1973 return responseBuilder.build(); 1974 } 1975 } 1976 if (this.serverManager.areDeadServersInProgress()) { 1977 LOG.info("Not running balancer because processing dead regionserver(s): " 1978 + this.serverManager.getDeadServers()); 1979 return responseBuilder.build(); 1980 } 1981 1982 if (this.cpHost != null) { 1983 try { 1984 if (this.cpHost.preBalance(request)) { 1985 LOG.debug("Coprocessor bypassing balancer request"); 1986 return responseBuilder.build(); 1987 } 1988 } catch (IOException ioe) { 1989 LOG.error("Error invoking master coprocessor preBalance()", ioe); 1990 return responseBuilder.build(); 1991 } 1992 } 1993 1994 Map<TableName, Map<ServerName, List<RegionInfo>>> assignments = 1995 this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager, 1996 this.serverManager.getOnlineServersList()); 1997 for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) { 1998 serverMap.keySet().removeAll(this.serverManager.getDrainingServersList()); 1999 } 2000 2001 // Give the balancer the current cluster state. 2002 this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); 2003 2004 List<RegionPlan> plans = this.balancer.balanceCluster(assignments); 2005 2006 responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ? 0 : plans.size()); 2007 2008 if (skipRegionManagementAction("balancer")) { 2009 // make one last check that the cluster isn't shutting down before proceeding. 2010 return responseBuilder.build(); 2011 } 2012 2013 // For dry run we don't actually want to execute the moves, but we do want 2014 // to execute the coprocessor below 2015 List<RegionPlan> sucRPs = 2016 request.isDryRun() ? Collections.emptyList() : executeRegionPlansWithThrottling(plans); 2017 2018 if (this.cpHost != null) { 2019 try { 2020 this.cpHost.postBalance(request, sucRPs); 2021 } catch (IOException ioe) { 2022 // balancing already succeeded so don't change the result 2023 LOG.error("Error invoking master coprocessor postBalance()", ioe); 2024 } 2025 } 2026 2027 responseBuilder.setMovesExecuted(sucRPs.size()); 2028 } 2029 2030 // If LoadBalancer did not generate any plans, it means the cluster is already balanced. 2031 // Return true indicating a success. 2032 return responseBuilder.build(); 2033 } 2034 2035 /** 2036 * Execute region plans with throttling 2037 * @param plans to execute 2038 * @return succeeded plans 2039 */ 2040 public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) { 2041 List<RegionPlan> successRegionPlans = new ArrayList<>(); 2042 int maxRegionsInTransition = getMaxRegionsInTransition(); 2043 long balanceStartTime = EnvironmentEdgeManager.currentTime(); 2044 long cutoffTime = balanceStartTime + this.maxBalancingTime; 2045 int rpCount = 0; // number of RegionPlans balanced so far 2046 if (plans != null && !plans.isEmpty()) { 2047 int balanceInterval = this.maxBalancingTime / plans.size(); 2048 LOG.info( 2049 "Balancer plans size is " + plans.size() + ", the balance interval is " + balanceInterval 2050 + " ms, and the max number regions in transition is " + maxRegionsInTransition); 2051 2052 for (RegionPlan plan : plans) { 2053 LOG.info("balance " + plan); 2054 // TODO: bulk assign 2055 try { 2056 this.assignmentManager.balance(plan); 2057 } catch (HBaseIOException hioe) { 2058 // should ignore failed plans here, avoiding the whole balance plans be aborted 2059 // later calls of balance() can fetch up the failed and skipped plans 2060 LOG.warn("Failed balance plan {}, skipping...", plan, hioe); 2061 } 2062 // rpCount records balance plans processed, does not care if a plan succeeds 2063 rpCount++; 2064 successRegionPlans.add(plan); 2065 2066 if (this.maxBalancingTime > 0) { 2067 balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition, 2068 cutoffTime); 2069 } 2070 2071 // if performing next balance exceeds cutoff time, exit the loop 2072 if ( 2073 this.maxBalancingTime > 0 && rpCount < plans.size() 2074 && EnvironmentEdgeManager.currentTime() > cutoffTime 2075 ) { 2076 // TODO: After balance, there should not be a cutoff time (keeping it as 2077 // a security net for now) 2078 LOG.debug( 2079 "No more balancing till next balance run; maxBalanceTime=" + this.maxBalancingTime); 2080 break; 2081 } 2082 } 2083 } 2084 LOG.debug("Balancer is going into sleep until next period in {}ms", getConfiguration() 2085 .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD)); 2086 return successRegionPlans; 2087 } 2088 2089 @Override 2090 public RegionNormalizerManager getRegionNormalizerManager() { 2091 return regionNormalizerManager; 2092 } 2093 2094 @Override 2095 public boolean normalizeRegions(final NormalizeTableFilterParams ntfp, 2096 final boolean isHighPriority) throws IOException { 2097 if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) { 2098 LOG.debug("Region normalization is disabled, don't run region normalizer."); 2099 return false; 2100 } 2101 if (skipRegionManagementAction("region normalizer")) { 2102 return false; 2103 } 2104 if (assignmentManager.hasRegionsInTransition()) { 2105 return false; 2106 } 2107 2108 final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(), 2109 ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false).stream() 2110 .map(TableDescriptor::getTableName).collect(Collectors.toSet()); 2111 final Set<TableName> allEnabledTables = 2112 tableStateManager.getTablesInStates(TableState.State.ENABLED); 2113 final List<TableName> targetTables = 2114 new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables)); 2115 Collections.shuffle(targetTables); 2116 return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority); 2117 } 2118 2119 /** Returns Client info for use as prefix on an audit log string; who did an action */ 2120 @Override 2121 public String getClientIdAuditPrefix() { 2122 return "Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 2123 + RpcServer.getRemoteAddress().orElse(null); 2124 } 2125 2126 /** 2127 * Switch for the background CatalogJanitor thread. Used for testing. The thread will continue to 2128 * run. It will just be a noop if disabled. 2129 * @param b If false, the catalog janitor won't do anything. 2130 */ 2131 public void setCatalogJanitorEnabled(final boolean b) { 2132 this.catalogJanitorChore.setEnabled(b); 2133 } 2134 2135 @Override 2136 public long mergeRegions(final RegionInfo[] regionsToMerge, final boolean forcible, final long ng, 2137 final long nonce) throws IOException { 2138 checkInitialized(); 2139 2140 if (!isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 2141 String regionsStr = Arrays.deepToString(regionsToMerge); 2142 LOG.warn("Merge switch is off! skip merge of " + regionsStr); 2143 throw new DoNotRetryIOException( 2144 "Merge of " + regionsStr + " failed because merge switch is off"); 2145 } 2146 2147 final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName) 2148 .collect(Collectors.joining(", ")); 2149 return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) { 2150 @Override 2151 protected void run() throws IOException { 2152 getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge); 2153 String aid = getClientIdAuditPrefix(); 2154 LOG.info("{} merge regions {}", aid, mergeRegionsStr); 2155 submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(), 2156 regionsToMerge, forcible)); 2157 getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge); 2158 } 2159 2160 @Override 2161 protected String getDescription() { 2162 return "MergeTableProcedure"; 2163 } 2164 }); 2165 } 2166 2167 @Override 2168 public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow, final long nonceGroup, 2169 final long nonce) throws IOException { 2170 checkInitialized(); 2171 2172 if (!isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 2173 LOG.warn("Split switch is off! skip split of " + regionInfo); 2174 throw new DoNotRetryIOException( 2175 "Split region " + regionInfo.getRegionNameAsString() + " failed due to split switch off"); 2176 } 2177 2178 return MasterProcedureUtil 2179 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2180 @Override 2181 protected void run() throws IOException { 2182 getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow); 2183 LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString()); 2184 2185 // Execute the operation asynchronously 2186 submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow)); 2187 } 2188 2189 @Override 2190 protected String getDescription() { 2191 return "SplitTableProcedure"; 2192 } 2193 }); 2194 } 2195 2196 // Public so can be accessed by tests. Blocks until move is done. 2197 // Replace with an async implementation from which you can get 2198 // a success/failure result. 2199 @InterfaceAudience.Private 2200 public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException { 2201 RegionState regionState = 2202 assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName)); 2203 2204 RegionInfo hri; 2205 if (regionState != null) { 2206 hri = regionState.getRegion(); 2207 } else { 2208 throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName)); 2209 } 2210 2211 ServerName dest; 2212 List<ServerName> exclude = hri.getTable().isSystemTable() 2213 ? assignmentManager.getExcludedServersForSystemTable() 2214 : new ArrayList<>(1); 2215 if ( 2216 destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName))) 2217 ) { 2218 LOG.info(Bytes.toString(encodedRegionName) + " can not move to " 2219 + Bytes.toString(destServerName) + " because the server is in exclude list"); 2220 destServerName = null; 2221 } 2222 if (destServerName == null || destServerName.length == 0) { 2223 LOG.info("Passed destination servername is null/empty so " + "choosing a server at random"); 2224 exclude.add(regionState.getServerName()); 2225 final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude); 2226 dest = balancer.randomAssignment(hri, destServers); 2227 if (dest == null) { 2228 LOG.debug("Unable to determine a plan to assign " + hri); 2229 return; 2230 } 2231 } else { 2232 ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName)); 2233 dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate)); 2234 if (dest == null) { 2235 LOG.debug("Unable to determine a plan to assign " + hri); 2236 return; 2237 } 2238 // TODO: What is this? I don't get it. 2239 if ( 2240 dest.equals(serverName) && balancer instanceof BaseLoadBalancer 2241 && !((BaseLoadBalancer) balancer).shouldBeOnMaster(hri) 2242 ) { 2243 // To avoid unnecessary region moving later by balancer. Don't put user 2244 // regions on master. 2245 LOG.debug("Skipping move of region " + hri.getRegionNameAsString() 2246 + " to avoid unnecessary region moving later by load balancer," 2247 + " because it should not be on master"); 2248 return; 2249 } 2250 } 2251 2252 if (dest.equals(regionState.getServerName())) { 2253 LOG.debug("Skipping move of region " + hri.getRegionNameAsString() 2254 + " because region already assigned to the same server " + dest + "."); 2255 return; 2256 } 2257 2258 // Now we can do the move 2259 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest); 2260 assert rp.getDestination() != null : rp.toString() + " " + dest; 2261 2262 try { 2263 checkInitialized(); 2264 if (this.cpHost != null) { 2265 this.cpHost.preMove(hri, rp.getSource(), rp.getDestination()); 2266 } 2267 2268 TransitRegionStateProcedure proc = 2269 this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination()); 2270 if (conf.getBoolean(WARMUP_BEFORE_MOVE, DEFAULT_WARMUP_BEFORE_MOVE)) { 2271 // Warmup the region on the destination before initiating the move. this call 2272 // is synchronous and takes some time. doing it before the source region gets 2273 // closed 2274 LOG.info(getClientIdAuditPrefix() + " move " + rp + ", warming up region on " 2275 + rp.getDestination()); 2276 serverManager.sendRegionWarmup(rp.getDestination(), hri); 2277 } 2278 LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); 2279 Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc); 2280 try { 2281 // Is this going to work? Will we throw exception on error? 2282 // TODO: CompletableFuture rather than this stunted Future. 2283 future.get(); 2284 } catch (InterruptedException | ExecutionException e) { 2285 throw new HBaseIOException(e); 2286 } 2287 if (this.cpHost != null) { 2288 this.cpHost.postMove(hri, rp.getSource(), rp.getDestination()); 2289 } 2290 } catch (IOException ioe) { 2291 if (ioe instanceof HBaseIOException) { 2292 throw (HBaseIOException) ioe; 2293 } 2294 throw new HBaseIOException(ioe); 2295 } 2296 } 2297 2298 @Override 2299 public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys, 2300 final long nonceGroup, final long nonce) throws IOException { 2301 checkInitialized(); 2302 TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor); 2303 if (desc == null) { 2304 throw new IOException("Creation for " + tableDescriptor + " is canceled by CP"); 2305 } 2306 String namespace = desc.getTableName().getNamespaceAsString(); 2307 this.clusterSchemaService.getNamespace(namespace); 2308 2309 RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys); 2310 TableDescriptorChecker.sanityCheck(conf, desc); 2311 2312 return MasterProcedureUtil 2313 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2314 @Override 2315 protected void run() throws IOException { 2316 getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions); 2317 2318 LOG.info(getClientIdAuditPrefix() + " create " + desc); 2319 2320 // TODO: We can handle/merge duplicate requests, and differentiate the case of 2321 // TableExistsException by saying if the schema is the same or not. 2322 // 2323 // We need to wait for the procedure to potentially fail due to "prepare" sanity 2324 // checks. This will block only the beginning of the procedure. See HBASE-19953. 2325 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 2326 submitProcedure( 2327 new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch)); 2328 latch.await(); 2329 2330 getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions); 2331 } 2332 2333 @Override 2334 protected String getDescription() { 2335 return "CreateTableProcedure"; 2336 } 2337 }); 2338 } 2339 2340 @Override 2341 public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException { 2342 if (isStopped()) { 2343 throw new MasterNotRunningException(); 2344 } 2345 2346 TableName tableName = tableDescriptor.getTableName(); 2347 if (!(tableName.isSystemTable())) { 2348 throw new IllegalArgumentException( 2349 "Only system table creation can use this createSystemTable API"); 2350 } 2351 2352 RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null); 2353 2354 LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor); 2355 2356 // This special create table is called locally to master. Therefore, no RPC means no need 2357 // to use nonce to detect duplicated RPC call. 2358 long procId = this.procedureExecutor.submitProcedure( 2359 new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions)); 2360 2361 return procId; 2362 } 2363 2364 private void startActiveMasterManager(int infoPort) throws KeeperException { 2365 String backupZNode = ZNodePaths.joinZNode(zooKeeper.getZNodePaths().backupMasterAddressesZNode, 2366 serverName.toString()); 2367 /* 2368 * Add a ZNode for ourselves in the backup master directory since we may not become the active 2369 * master. If so, we want the actual active master to know we are backup masters, so that it 2370 * won't assign regions to us if so configured. If we become the active master later, 2371 * ActiveMasterManager will delete this node explicitly. If we crash before then, ZooKeeper will 2372 * delete this node for us since it is ephemeral. 2373 */ 2374 LOG.info("Adding backup master ZNode " + backupZNode); 2375 if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) { 2376 LOG.warn("Failed create of " + backupZNode + " by " + serverName); 2377 } 2378 this.activeMasterManager.setInfoPort(infoPort); 2379 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 2380 // If we're a backup master, stall until a primary to write this address 2381 if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { 2382 LOG.debug("HMaster started in backup mode. Stalling until master znode is written."); 2383 // This will only be a minute or so while the cluster starts up, 2384 // so don't worry about setting watches on the parent znode 2385 while (!activeMasterManager.hasActiveMaster()) { 2386 LOG.debug("Waiting for master address and cluster state znode to be written."); 2387 Threads.sleep(timeout); 2388 } 2389 } 2390 MonitoredTask status = TaskMonitor.get().createStatus("Master startup"); 2391 status.setDescription("Master startup"); 2392 try { 2393 if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) { 2394 finishActiveMasterInitialization(status); 2395 } 2396 } catch (Throwable t) { 2397 status.setStatus("Failed to become active: " + t.getMessage()); 2398 LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t); 2399 // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility 2400 if ( 2401 t instanceof NoClassDefFoundError 2402 && t.getMessage().contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction") 2403 ) { 2404 // improved error message for this special case 2405 abort("HBase is having a problem with its Hadoop jars. You may need to recompile " 2406 + "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() 2407 + " or change your hadoop jars to start properly", t); 2408 } else { 2409 abort("Unhandled exception. Starting shutdown.", t); 2410 } 2411 } finally { 2412 status.cleanup(); 2413 } 2414 } 2415 2416 private static boolean isCatalogTable(final TableName tableName) { 2417 return tableName.equals(TableName.META_TABLE_NAME); 2418 } 2419 2420 @Override 2421 public long deleteTable(final TableName tableName, final long nonceGroup, final long nonce) 2422 throws IOException { 2423 checkInitialized(); 2424 2425 return MasterProcedureUtil 2426 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2427 @Override 2428 protected void run() throws IOException { 2429 getMaster().getMasterCoprocessorHost().preDeleteTable(tableName); 2430 2431 LOG.info(getClientIdAuditPrefix() + " delete " + tableName); 2432 2433 // TODO: We can handle/merge duplicate request 2434 // 2435 // We need to wait for the procedure to potentially fail due to "prepare" sanity 2436 // checks. This will block only the beginning of the procedure. See HBASE-19953. 2437 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 2438 submitProcedure( 2439 new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch)); 2440 latch.await(); 2441 2442 getMaster().getMasterCoprocessorHost().postDeleteTable(tableName); 2443 } 2444 2445 @Override 2446 protected String getDescription() { 2447 return "DeleteTableProcedure"; 2448 } 2449 }); 2450 } 2451 2452 @Override 2453 public long truncateTable(final TableName tableName, final boolean preserveSplits, 2454 final long nonceGroup, final long nonce) throws IOException { 2455 checkInitialized(); 2456 2457 return MasterProcedureUtil 2458 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2459 @Override 2460 protected void run() throws IOException { 2461 getMaster().getMasterCoprocessorHost().preTruncateTable(tableName); 2462 2463 LOG.info(getClientIdAuditPrefix() + " truncate " + tableName); 2464 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0); 2465 submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, 2466 preserveSplits, latch)); 2467 latch.await(); 2468 2469 getMaster().getMasterCoprocessorHost().postTruncateTable(tableName); 2470 } 2471 2472 @Override 2473 protected String getDescription() { 2474 return "TruncateTableProcedure"; 2475 } 2476 }); 2477 } 2478 2479 @Override 2480 public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column, 2481 final long nonceGroup, final long nonce) throws IOException { 2482 checkInitialized(); 2483 checkTableExists(tableName); 2484 2485 return modifyTable(tableName, new TableDescriptorGetter() { 2486 2487 @Override 2488 public TableDescriptor get() throws IOException { 2489 TableDescriptor old = getTableDescriptors().get(tableName); 2490 if (old.hasColumnFamily(column.getName())) { 2491 throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString() 2492 + "' in table '" + tableName + "' already exists so cannot be added"); 2493 } 2494 2495 return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build(); 2496 } 2497 }, nonceGroup, nonce, true); 2498 } 2499 2500 /** 2501 * Implement to return TableDescriptor after pre-checks 2502 */ 2503 protected interface TableDescriptorGetter { 2504 TableDescriptor get() throws IOException; 2505 } 2506 2507 @Override 2508 public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor, 2509 final long nonceGroup, final long nonce) throws IOException { 2510 checkInitialized(); 2511 checkTableExists(tableName); 2512 return modifyTable(tableName, new TableDescriptorGetter() { 2513 2514 @Override 2515 public TableDescriptor get() throws IOException { 2516 TableDescriptor old = getTableDescriptors().get(tableName); 2517 if (!old.hasColumnFamily(descriptor.getName())) { 2518 throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString() 2519 + "' does not exist, so it cannot be modified"); 2520 } 2521 2522 return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build(); 2523 } 2524 }, nonceGroup, nonce, true); 2525 } 2526 2527 @Override 2528 public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, String dstSFT, 2529 long nonceGroup, long nonce) throws IOException { 2530 checkInitialized(); 2531 return MasterProcedureUtil 2532 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2533 2534 @Override 2535 protected void run() throws IOException { 2536 String sft = getMaster().getMasterCoprocessorHost() 2537 .preModifyColumnFamilyStoreFileTracker(tableName, family, dstSFT); 2538 LOG.info("{} modify column {} store file tracker of table {} to {}", 2539 getClientIdAuditPrefix(), Bytes.toStringBinary(family), tableName, sft); 2540 submitProcedure(new ModifyColumnFamilyStoreFileTrackerProcedure( 2541 procedureExecutor.getEnvironment(), tableName, family, sft)); 2542 getMaster().getMasterCoprocessorHost().postModifyColumnFamilyStoreFileTracker(tableName, 2543 family, dstSFT); 2544 } 2545 2546 @Override 2547 protected String getDescription() { 2548 return "ModifyColumnFamilyStoreFileTrackerProcedure"; 2549 } 2550 }); 2551 } 2552 2553 @Override 2554 public long deleteColumn(final TableName tableName, final byte[] columnName, 2555 final long nonceGroup, final long nonce) throws IOException { 2556 checkInitialized(); 2557 checkTableExists(tableName); 2558 2559 return modifyTable(tableName, new TableDescriptorGetter() { 2560 2561 @Override 2562 public TableDescriptor get() throws IOException { 2563 TableDescriptor old = getTableDescriptors().get(tableName); 2564 2565 if (!old.hasColumnFamily(columnName)) { 2566 throw new InvalidFamilyOperationException( 2567 "Family '" + Bytes.toString(columnName) + "' does not exist, so it cannot be deleted"); 2568 } 2569 if (old.getColumnFamilyCount() == 1) { 2570 throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName) 2571 + "' is the only column family in the table, so it cannot be deleted"); 2572 } 2573 return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build(); 2574 } 2575 }, nonceGroup, nonce, true); 2576 } 2577 2578 @Override 2579 public long enableTable(final TableName tableName, final long nonceGroup, final long nonce) 2580 throws IOException { 2581 checkInitialized(); 2582 2583 return MasterProcedureUtil 2584 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2585 @Override 2586 protected void run() throws IOException { 2587 getMaster().getMasterCoprocessorHost().preEnableTable(tableName); 2588 2589 // Normally, it would make sense for this authorization check to exist inside 2590 // AccessController, but because the authorization check is done based on internal state 2591 // (rather than explicit permissions) we'll do the check here instead of in the 2592 // coprocessor. 2593 MasterQuotaManager quotaManager = getMasterQuotaManager(); 2594 if (quotaManager != null) { 2595 if (quotaManager.isQuotaInitialized()) { 2596 SpaceQuotaSnapshot currSnapshotOfTable = 2597 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName); 2598 if (currSnapshotOfTable != null) { 2599 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 2600 if ( 2601 quotaStatus.isInViolation() 2602 && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null) 2603 ) { 2604 throw new AccessDeniedException("Enabling the table '" + tableName 2605 + "' is disallowed due to a violated space quota."); 2606 } 2607 } 2608 } else if (LOG.isTraceEnabled()) { 2609 LOG 2610 .trace("Unable to check for space quotas as the MasterQuotaManager is not enabled"); 2611 } 2612 } 2613 2614 LOG.info(getClientIdAuditPrefix() + " enable " + tableName); 2615 2616 // Execute the operation asynchronously - client will check the progress of the operation 2617 // In case the request is from a <1.1 client before returning, 2618 // we want to make sure that the table is prepared to be 2619 // enabled (the table is locked and the table state is set). 2620 // Note: if the procedure throws exception, we will catch it and rethrow. 2621 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch(); 2622 submitProcedure( 2623 new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, prepareLatch)); 2624 prepareLatch.await(); 2625 2626 getMaster().getMasterCoprocessorHost().postEnableTable(tableName); 2627 } 2628 2629 @Override 2630 protected String getDescription() { 2631 return "EnableTableProcedure"; 2632 } 2633 }); 2634 } 2635 2636 @Override 2637 public long disableTable(final TableName tableName, final long nonceGroup, final long nonce) 2638 throws IOException { 2639 checkInitialized(); 2640 2641 return MasterProcedureUtil 2642 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2643 @Override 2644 protected void run() throws IOException { 2645 getMaster().getMasterCoprocessorHost().preDisableTable(tableName); 2646 2647 LOG.info(getClientIdAuditPrefix() + " disable " + tableName); 2648 2649 // Execute the operation asynchronously - client will check the progress of the operation 2650 // In case the request is from a <1.1 client before returning, 2651 // we want to make sure that the table is prepared to be 2652 // enabled (the table is locked and the table state is set). 2653 // Note: if the procedure throws exception, we will catch it and rethrow. 2654 // 2655 // We need to wait for the procedure to potentially fail due to "prepare" sanity 2656 // checks. This will block only the beginning of the procedure. See HBASE-19953. 2657 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch(); 2658 submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, 2659 false, prepareLatch)); 2660 prepareLatch.await(); 2661 2662 getMaster().getMasterCoprocessorHost().postDisableTable(tableName); 2663 } 2664 2665 @Override 2666 protected String getDescription() { 2667 return "DisableTableProcedure"; 2668 } 2669 }); 2670 } 2671 2672 private long modifyTable(final TableName tableName, 2673 final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce, 2674 final boolean shouldCheckDescriptor) throws IOException { 2675 return MasterProcedureUtil 2676 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2677 @Override 2678 protected void run() throws IOException { 2679 TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName); 2680 TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost() 2681 .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get()); 2682 TableDescriptorChecker.sanityCheck(conf, newDescriptor); 2683 LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName, 2684 oldDescriptor, newDescriptor); 2685 2686 // Execute the operation synchronously - wait for the operation completes before 2687 // continuing. 2688 // 2689 // We need to wait for the procedure to potentially fail due to "prepare" sanity 2690 // checks. This will block only the beginning of the procedure. See HBASE-19953. 2691 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 2692 submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(), 2693 newDescriptor, latch, oldDescriptor, shouldCheckDescriptor)); 2694 latch.await(); 2695 2696 getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor, 2697 newDescriptor); 2698 } 2699 2700 @Override 2701 protected String getDescription() { 2702 return "ModifyTableProcedure"; 2703 } 2704 }); 2705 2706 } 2707 2708 @Override 2709 public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor, 2710 final long nonceGroup, final long nonce) throws IOException { 2711 checkInitialized(); 2712 return modifyTable(tableName, new TableDescriptorGetter() { 2713 @Override 2714 public TableDescriptor get() throws IOException { 2715 return newDescriptor; 2716 } 2717 }, nonceGroup, nonce, false); 2718 2719 } 2720 2721 @Override 2722 public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup, 2723 long nonce) throws IOException { 2724 checkInitialized(); 2725 return MasterProcedureUtil 2726 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2727 2728 @Override 2729 protected void run() throws IOException { 2730 String sft = getMaster().getMasterCoprocessorHost() 2731 .preModifyTableStoreFileTracker(tableName, dstSFT); 2732 LOG.info("{} modify table store file tracker of table {} to {}", getClientIdAuditPrefix(), 2733 tableName, sft); 2734 submitProcedure(new ModifyTableStoreFileTrackerProcedure( 2735 procedureExecutor.getEnvironment(), tableName, sft)); 2736 getMaster().getMasterCoprocessorHost().postModifyTableStoreFileTracker(tableName, sft); 2737 } 2738 2739 @Override 2740 protected String getDescription() { 2741 return "ModifyTableStoreFileTrackerProcedure"; 2742 } 2743 }); 2744 } 2745 2746 public long restoreSnapshot(final SnapshotDescription snapshotDesc, final long nonceGroup, 2747 final long nonce, final boolean restoreAcl, final String customSFT) throws IOException { 2748 checkInitialized(); 2749 getSnapshotManager().checkSnapshotSupport(); 2750 2751 // Ensure namespace exists. Will throw exception if non-known NS. 2752 final TableName dstTable = TableName.valueOf(snapshotDesc.getTable()); 2753 getClusterSchema().getNamespace(dstTable.getNamespaceAsString()); 2754 2755 return MasterProcedureUtil 2756 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 2757 @Override 2758 protected void run() throws IOException { 2759 setProcId(getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(), 2760 restoreAcl, customSFT)); 2761 } 2762 2763 @Override 2764 protected String getDescription() { 2765 return "RestoreSnapshotProcedure"; 2766 } 2767 }); 2768 } 2769 2770 private void checkTableExists(final TableName tableName) 2771 throws IOException, TableNotFoundException { 2772 if (!tableDescriptors.exists(tableName)) { 2773 throw new TableNotFoundException(tableName); 2774 } 2775 } 2776 2777 @Override 2778 public void checkTableModifiable(final TableName tableName) 2779 throws IOException, TableNotFoundException, TableNotDisabledException { 2780 if (isCatalogTable(tableName)) { 2781 throw new IOException("Can't modify catalog tables"); 2782 } 2783 checkTableExists(tableName); 2784 TableState ts = getTableStateManager().getTableState(tableName); 2785 if (!ts.isDisabled()) { 2786 throw new TableNotDisabledException("Not DISABLED; " + ts); 2787 } 2788 } 2789 2790 public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException { 2791 return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class)); 2792 } 2793 2794 public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options) 2795 throws InterruptedIOException { 2796 ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder(); 2797 // given that hbase1 can't submit the request with Option, 2798 // we return all information to client if the list of Option is empty. 2799 if (options.isEmpty()) { 2800 options = EnumSet.allOf(Option.class); 2801 } 2802 2803 // TASKS and/or LIVE_SERVERS will populate this map, which will be given to the builder if 2804 // not null after option processing completes. 2805 Map<ServerName, ServerMetrics> serverMetricsMap = null; 2806 2807 for (Option opt : options) { 2808 switch (opt) { 2809 case HBASE_VERSION: 2810 builder.setHBaseVersion(VersionInfo.getVersion()); 2811 break; 2812 case CLUSTER_ID: 2813 builder.setClusterId(getClusterId()); 2814 break; 2815 case MASTER: 2816 builder.setMasterName(getServerName()); 2817 break; 2818 case BACKUP_MASTERS: 2819 builder.setBackerMasterNames(getBackupMasters()); 2820 break; 2821 case TASKS: { 2822 // Master tasks 2823 builder.setMasterTasks(TaskMonitor.get().getTasks().stream() 2824 .map(task -> ServerTaskBuilder.newBuilder().setDescription(task.getDescription()) 2825 .setStatus(task.getStatus()) 2826 .setState(ServerTask.State.valueOf(task.getState().name())) 2827 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()) 2828 .build()) 2829 .collect(Collectors.toList())); 2830 // TASKS is also synonymous with LIVE_SERVERS for now because task information for 2831 // regionservers is carried in ServerLoad. 2832 // Add entries to serverMetricsMap for all live servers, if we haven't already done so 2833 if (serverMetricsMap == null) { 2834 serverMetricsMap = getOnlineServers(); 2835 } 2836 break; 2837 } 2838 case LIVE_SERVERS: { 2839 // Add entries to serverMetricsMap for all live servers, if we haven't already done so 2840 if (serverMetricsMap == null) { 2841 serverMetricsMap = getOnlineServers(); 2842 } 2843 break; 2844 } 2845 case DEAD_SERVERS: { 2846 if (serverManager != null) { 2847 builder.setDeadServerNames( 2848 new ArrayList<>(serverManager.getDeadServers().copyServerNames())); 2849 } 2850 break; 2851 } 2852 case UNKNOWN_SERVERS: { 2853 if (serverManager != null) { 2854 builder.setUnknownServerNames(getUnknownServers()); 2855 } 2856 break; 2857 } 2858 case MASTER_COPROCESSORS: { 2859 if (cpHost != null) { 2860 builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors())); 2861 } 2862 break; 2863 } 2864 case REGIONS_IN_TRANSITION: { 2865 if (assignmentManager != null) { 2866 builder.setRegionsInTransition( 2867 assignmentManager.getRegionStates().getRegionsStateInTransition()); 2868 } 2869 break; 2870 } 2871 case BALANCER_ON: { 2872 if (loadBalancerTracker != null) { 2873 builder.setBalancerOn(loadBalancerTracker.isBalancerOn()); 2874 } 2875 break; 2876 } 2877 case MASTER_INFO_PORT: { 2878 if (infoServer != null) { 2879 builder.setMasterInfoPort(infoServer.getPort()); 2880 } 2881 break; 2882 } 2883 case SERVERS_NAME: { 2884 if (serverManager != null) { 2885 builder.setServerNames(serverManager.getOnlineServersList()); 2886 } 2887 break; 2888 } 2889 case TABLE_TO_REGIONS_COUNT: { 2890 if (isActiveMaster() && isInitialized() && assignmentManager != null) { 2891 try { 2892 Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>(); 2893 Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll(); 2894 for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) { 2895 TableName tableName = tableDescriptor.getTableName(); 2896 RegionStatesCount regionStatesCount = 2897 assignmentManager.getRegionStatesCount(tableName); 2898 tableRegionStatesCountMap.put(tableName, regionStatesCount); 2899 } 2900 builder.setTableRegionStatesCount(tableRegionStatesCountMap); 2901 } catch (IOException e) { 2902 LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e); 2903 } 2904 } 2905 break; 2906 } 2907 } 2908 } 2909 2910 if (serverMetricsMap != null) { 2911 builder.setLiveServerMetrics(serverMetricsMap); 2912 } 2913 2914 return builder.build(); 2915 } 2916 2917 private List<ServerName> getUnknownServers() { 2918 if (serverManager != null) { 2919 final Set<ServerName> serverNames = getAssignmentManager().getRegionStates().getRegionStates() 2920 .stream().map(RegionState::getServerName).collect(Collectors.toSet()); 2921 final List<ServerName> unknownServerNames = serverNames.stream() 2922 .filter(sn -> sn != null && serverManager.isServerUnknown(sn)).collect(Collectors.toList()); 2923 return unknownServerNames; 2924 } 2925 return null; 2926 } 2927 2928 private Map<ServerName, ServerMetrics> getOnlineServers() { 2929 if (serverManager != null) { 2930 final Map<ServerName, ServerMetrics> map = new HashMap<>(); 2931 serverManager.getOnlineServers().entrySet().forEach(e -> map.put(e.getKey(), e.getValue())); 2932 return map; 2933 } 2934 return null; 2935 } 2936 2937 /** Returns cluster status */ 2938 public ClusterMetrics getClusterMetrics() throws IOException { 2939 return getClusterMetrics(EnumSet.allOf(Option.class)); 2940 } 2941 2942 public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException { 2943 if (cpHost != null) { 2944 cpHost.preGetClusterMetrics(); 2945 } 2946 ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options); 2947 if (cpHost != null) { 2948 cpHost.postGetClusterMetrics(status); 2949 } 2950 return status; 2951 } 2952 2953 @Override 2954 public Optional<ServerName> getActiveMaster() { 2955 return activeMasterManager.getActiveMasterServerName(); 2956 } 2957 2958 @Override 2959 public List<ServerName> getBackupMasters() { 2960 return activeMasterManager.getBackupMasters(); 2961 } 2962 2963 /** Returns info port of active master or 0 if any exception occurs. */ 2964 public int getActiveMasterInfoPort() { 2965 return activeMasterManager.getActiveMasterInfoPort(); 2966 } 2967 2968 /** 2969 * @param sn is ServerName of the backup master 2970 * @return info port of backup master or 0 if any exception occurs. 2971 */ 2972 public int getBackupMasterInfoPort(final ServerName sn) { 2973 return activeMasterManager.getBackupMasterInfoPort(sn); 2974 } 2975 2976 @Override 2977 public Iterator<ServerName> getRegionServers() { 2978 return regionServerTracker.getRegionServers().iterator(); 2979 } 2980 2981 /** 2982 * The set of loaded coprocessors is stored in a static set. Since it's statically allocated, it 2983 * does not require that HMaster's cpHost be initialized prior to accessing it. 2984 * @return a String representation of the set of names of the loaded coprocessors. 2985 */ 2986 public static String getLoadedCoprocessors() { 2987 return CoprocessorHost.getLoadedCoprocessors().toString(); 2988 } 2989 2990 /** Returns timestamp in millis when HMaster was started. */ 2991 public long getMasterStartTime() { 2992 return startcode; 2993 } 2994 2995 /** Returns timestamp in millis when HMaster became the active master. */ 2996 public long getMasterActiveTime() { 2997 return masterActiveTime; 2998 } 2999 3000 /** Returns timestamp in millis when HMaster finished becoming the active master */ 3001 public long getMasterFinishedInitializationTime() { 3002 return masterFinishedInitializationTime; 3003 } 3004 3005 public int getNumWALFiles() { 3006 return 0; 3007 } 3008 3009 public ProcedureStore getProcedureStore() { 3010 return procedureStore; 3011 } 3012 3013 public int getRegionServerInfoPort(final ServerName sn) { 3014 int port = this.serverManager.getInfoPort(sn); 3015 return port == 0 3016 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT) 3017 : port; 3018 } 3019 3020 @Override 3021 public String getRegionServerVersion(ServerName sn) { 3022 // Will return "0.0.0" if the server is not online to prevent move system region to unknown 3023 // version RS. 3024 return this.serverManager.getVersion(sn); 3025 } 3026 3027 @Override 3028 public void checkIfShouldMoveSystemRegionAsync() { 3029 assignmentManager.checkIfShouldMoveSystemRegionAsync(); 3030 } 3031 3032 /** Returns array of coprocessor SimpleNames. */ 3033 public String[] getMasterCoprocessors() { 3034 Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors(); 3035 return masterCoprocessors.toArray(new String[masterCoprocessors.size()]); 3036 } 3037 3038 @Override 3039 public void abort(String reason, Throwable cause) { 3040 if (!setAbortRequested() || isStopped()) { 3041 LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped()); 3042 return; 3043 } 3044 if (cpHost != null) { 3045 // HBASE-4014: dump a list of loaded coprocessors. 3046 LOG.error(HBaseMarkers.FATAL, 3047 "Master server abort: loaded coprocessors are: " + getLoadedCoprocessors()); 3048 } 3049 String msg = "***** ABORTING master " + this + ": " + reason + " *****"; 3050 if (cause != null) { 3051 LOG.error(HBaseMarkers.FATAL, msg, cause); 3052 } else { 3053 LOG.error(HBaseMarkers.FATAL, msg); 3054 } 3055 3056 try { 3057 stopMaster(); 3058 } catch (IOException e) { 3059 LOG.error("Exception occurred while stopping master", e); 3060 } 3061 } 3062 3063 @Override 3064 public ZKWatcher getZooKeeper() { 3065 return zooKeeper; 3066 } 3067 3068 @Override 3069 public MasterCoprocessorHost getMasterCoprocessorHost() { 3070 return cpHost; 3071 } 3072 3073 @Override 3074 public MasterQuotaManager getMasterQuotaManager() { 3075 return quotaManager; 3076 } 3077 3078 @Override 3079 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 3080 return procedureExecutor; 3081 } 3082 3083 @Override 3084 public ServerName getServerName() { 3085 return this.serverName; 3086 } 3087 3088 @Override 3089 public AssignmentManager getAssignmentManager() { 3090 return this.assignmentManager; 3091 } 3092 3093 @Override 3094 public CatalogJanitor getCatalogJanitor() { 3095 return this.catalogJanitorChore; 3096 } 3097 3098 public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() { 3099 return rsFatals; 3100 } 3101 3102 /** 3103 * Shutdown the cluster. Master runs a coordinated stop of all RegionServers and then itself. 3104 */ 3105 public void shutdown() throws IOException { 3106 TraceUtil.trace(() -> { 3107 if (cpHost != null) { 3108 cpHost.preShutdown(); 3109 } 3110 3111 // Tell the servermanager cluster shutdown has been called. This makes it so when Master is 3112 // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting 3113 // the cluster status as down. RegionServers will notice this change in state and will start 3114 // shutting themselves down. When last has exited, Master can go down. 3115 if (this.serverManager != null) { 3116 this.serverManager.shutdownCluster(); 3117 } 3118 if (this.clusterStatusTracker != null) { 3119 try { 3120 this.clusterStatusTracker.setClusterDown(); 3121 } catch (KeeperException e) { 3122 LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e); 3123 } 3124 } 3125 // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc., 3126 // processing so we can go down. 3127 if (this.procedureExecutor != null) { 3128 this.procedureExecutor.stop(); 3129 } 3130 // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on; 3131 // this is what we want especially if the Master is in startup phase doing call outs to 3132 // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on 3133 // the rpc to timeout. 3134 if (this.clusterConnection != null) { 3135 this.clusterConnection.close(); 3136 } 3137 }, "HMaster.shutdown"); 3138 } 3139 3140 public void stopMaster() throws IOException { 3141 if (cpHost != null) { 3142 cpHost.preStopMaster(); 3143 } 3144 stop("Stopped by " + Thread.currentThread().getName()); 3145 } 3146 3147 @Override 3148 public void stop(String msg) { 3149 if (!isStopped()) { 3150 super.stop(msg); 3151 if (this.activeMasterManager != null) { 3152 this.activeMasterManager.stop(); 3153 } 3154 } 3155 } 3156 3157 @InterfaceAudience.Private 3158 protected void checkServiceStarted() throws ServerNotRunningYetException { 3159 if (!serviceStarted) { 3160 throw new ServerNotRunningYetException("Server is not running yet"); 3161 } 3162 } 3163 3164 void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException, 3165 MasterNotRunningException, MasterStoppedException { 3166 checkServiceStarted(); 3167 if (!isInitialized()) { 3168 throw new PleaseHoldException("Master is initializing"); 3169 } 3170 if (isStopped()) { 3171 throw new MasterStoppedException(); 3172 } 3173 } 3174 3175 /** 3176 * Report whether this master is currently the active master or not. If not active master, we are 3177 * parked on ZK waiting to become active. This method is used for testing. 3178 * @return true if active master, false if not. 3179 */ 3180 @Override 3181 public boolean isActiveMaster() { 3182 return activeMaster; 3183 } 3184 3185 /** 3186 * Report whether this master has completed with its initialization and is ready. If ready, the 3187 * master is also the active master. A standby master is never ready. This method is used for 3188 * testing. 3189 * @return true if master is ready to go, false if not. 3190 */ 3191 @Override 3192 public boolean isInitialized() { 3193 return initialized.isReady(); 3194 } 3195 3196 /** 3197 * Report whether this master is started This method is used for testing. 3198 * @return true if master is ready to go, false if not. 3199 */ 3200 3201 @Override 3202 public boolean isOnline() { 3203 return serviceStarted; 3204 } 3205 3206 /** 3207 * Report whether this master is in maintenance mode. 3208 * @return true if master is in maintenanceMode 3209 */ 3210 @Override 3211 public boolean isInMaintenanceMode() { 3212 return maintenanceMode; 3213 } 3214 3215 @InterfaceAudience.Private 3216 public void setInitialized(boolean isInitialized) { 3217 procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized); 3218 } 3219 3220 @Override 3221 public ProcedureEvent<?> getInitializedEvent() { 3222 return initialized; 3223 } 3224 3225 /** 3226 * Compute the average load across all region servers. Currently, this uses a very naive 3227 * computation - just uses the number of regions being served, ignoring stats about number of 3228 * requests. 3229 * @return the average load 3230 */ 3231 public double getAverageLoad() { 3232 if (this.assignmentManager == null) { 3233 return 0; 3234 } 3235 3236 RegionStates regionStates = this.assignmentManager.getRegionStates(); 3237 if (regionStates == null) { 3238 return 0; 3239 } 3240 return regionStates.getAverageLoad(); 3241 } 3242 3243 @Override 3244 public boolean registerService(Service instance) { 3245 /* 3246 * No stacking of instances is allowed for a single service name 3247 */ 3248 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 3249 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 3250 if (coprocessorServiceHandlers.containsKey(serviceName)) { 3251 LOG.error("Coprocessor service " + serviceName 3252 + " already registered, rejecting request from " + instance); 3253 return false; 3254 } 3255 3256 coprocessorServiceHandlers.put(serviceName, instance); 3257 if (LOG.isDebugEnabled()) { 3258 LOG.debug("Registered master coprocessor service: service=" + serviceName); 3259 } 3260 return true; 3261 } 3262 3263 /** 3264 * Utility for constructing an instance of the passed HMaster class. n * @return HMaster instance. 3265 */ 3266 public static HMaster constructMaster(Class<? extends HMaster> masterClass, 3267 final Configuration conf) { 3268 try { 3269 Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class); 3270 return c.newInstance(conf); 3271 } catch (Exception e) { 3272 Throwable error = e; 3273 if ( 3274 e instanceof InvocationTargetException 3275 && ((InvocationTargetException) e).getTargetException() != null 3276 ) { 3277 error = ((InvocationTargetException) e).getTargetException(); 3278 } 3279 throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". ", 3280 error); 3281 } 3282 } 3283 3284 /** 3285 * @see org.apache.hadoop.hbase.master.HMasterCommandLine 3286 */ 3287 public static void main(String[] args) { 3288 LOG.info("STARTING service " + HMaster.class.getSimpleName()); 3289 VersionInfo.logVersion(); 3290 new HMasterCommandLine(HMaster.class).doMain(args); 3291 } 3292 3293 public HFileCleaner getHFileCleaner() { 3294 return this.hfileCleaners.get(0); 3295 } 3296 3297 public List<HFileCleaner> getHFileCleaners() { 3298 return this.hfileCleaners; 3299 } 3300 3301 public LogCleaner getLogCleaner() { 3302 return this.logCleaner; 3303 } 3304 3305 /** Returns the underlying snapshot manager */ 3306 @Override 3307 public SnapshotManager getSnapshotManager() { 3308 return this.snapshotManager; 3309 } 3310 3311 /** Returns the underlying MasterProcedureManagerHost */ 3312 @Override 3313 public MasterProcedureManagerHost getMasterProcedureManagerHost() { 3314 return mpmHost; 3315 } 3316 3317 @Override 3318 public ClusterSchema getClusterSchema() { 3319 return this.clusterSchemaService; 3320 } 3321 3322 /** 3323 * Create a new Namespace. 3324 * @param namespaceDescriptor descriptor for new Namespace 3325 * @param nonceGroup Identifier for the source of the request, a client or process. 3326 * @param nonce A unique identifier for this operation from the client or process 3327 * identified by <code>nonceGroup</code> (the source must ensure each 3328 * operation gets a unique id). 3329 * @return procedure id 3330 */ 3331 long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup, 3332 final long nonce) throws IOException { 3333 checkInitialized(); 3334 3335 TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName())); 3336 3337 return MasterProcedureUtil 3338 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 3339 @Override 3340 protected void run() throws IOException { 3341 getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor); 3342 // We need to wait for the procedure to potentially fail due to "prepare" sanity 3343 // checks. This will block only the beginning of the procedure. See HBASE-19953. 3344 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 3345 LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor); 3346 // Execute the operation synchronously - wait for the operation to complete before 3347 // continuing. 3348 setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch)); 3349 latch.await(); 3350 getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor); 3351 } 3352 3353 @Override 3354 protected String getDescription() { 3355 return "CreateNamespaceProcedure"; 3356 } 3357 }); 3358 } 3359 3360 /** 3361 * Modify an existing Namespace. 3362 * @param nonceGroup Identifier for the source of the request, a client or process. 3363 * @param nonce A unique identifier for this operation from the client or process identified 3364 * by <code>nonceGroup</code> (the source must ensure each operation gets a 3365 * unique id). 3366 * @return procedure id 3367 */ 3368 long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup, 3369 final long nonce) throws IOException { 3370 checkInitialized(); 3371 3372 TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName())); 3373 3374 return MasterProcedureUtil 3375 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 3376 @Override 3377 protected void run() throws IOException { 3378 NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName()); 3379 getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor, 3380 newNsDescriptor); 3381 // We need to wait for the procedure to potentially fail due to "prepare" sanity 3382 // checks. This will block only the beginning of the procedure. See HBASE-19953. 3383 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 3384 LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor); 3385 // Execute the operation synchronously - wait for the operation to complete before 3386 // continuing. 3387 setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch)); 3388 latch.await(); 3389 getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor, 3390 newNsDescriptor); 3391 } 3392 3393 @Override 3394 protected String getDescription() { 3395 return "ModifyNamespaceProcedure"; 3396 } 3397 }); 3398 } 3399 3400 /** 3401 * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed. 3402 * @param nonceGroup Identifier for the source of the request, a client or process. 3403 * @param nonce A unique identifier for this operation from the client or process identified 3404 * by <code>nonceGroup</code> (the source must ensure each operation gets a 3405 * unique id). 3406 * @return procedure id 3407 */ 3408 long deleteNamespace(final String name, final long nonceGroup, final long nonce) 3409 throws IOException { 3410 checkInitialized(); 3411 3412 return MasterProcedureUtil 3413 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 3414 @Override 3415 protected void run() throws IOException { 3416 getMaster().getMasterCoprocessorHost().preDeleteNamespace(name); 3417 LOG.info(getClientIdAuditPrefix() + " delete " + name); 3418 // Execute the operation synchronously - wait for the operation to complete before 3419 // continuing. 3420 // 3421 // We need to wait for the procedure to potentially fail due to "prepare" sanity 3422 // checks. This will block only the beginning of the procedure. See HBASE-19953. 3423 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 3424 setProcId(submitProcedure( 3425 new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch))); 3426 latch.await(); 3427 // Will not be invoked in the face of Exception thrown by the Procedure's execution 3428 getMaster().getMasterCoprocessorHost().postDeleteNamespace(name); 3429 } 3430 3431 @Override 3432 protected String getDescription() { 3433 return "DeleteNamespaceProcedure"; 3434 } 3435 }); 3436 } 3437 3438 /** 3439 * Get a Namespace 3440 * @param name Name of the Namespace 3441 * @return Namespace descriptor for <code>name</code> 3442 */ 3443 NamespaceDescriptor getNamespace(String name) throws IOException { 3444 checkInitialized(); 3445 if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name); 3446 NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name); 3447 if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd); 3448 return nsd; 3449 } 3450 3451 /** 3452 * Get all Namespaces 3453 * @return All Namespace descriptors 3454 */ 3455 List<NamespaceDescriptor> getNamespaces() throws IOException { 3456 checkInitialized(); 3457 final List<NamespaceDescriptor> nsds = new ArrayList<>(); 3458 if (cpHost != null) { 3459 cpHost.preListNamespaceDescriptors(nsds); 3460 } 3461 nsds.addAll(this.clusterSchemaService.getNamespaces()); 3462 if (this.cpHost != null) { 3463 this.cpHost.postListNamespaceDescriptors(nsds); 3464 } 3465 return nsds; 3466 } 3467 3468 /** 3469 * List namespace names 3470 * @return All namespace names 3471 */ 3472 public List<String> listNamespaces() throws IOException { 3473 checkInitialized(); 3474 List<String> namespaces = new ArrayList<>(); 3475 if (cpHost != null) { 3476 cpHost.preListNamespaces(namespaces); 3477 } 3478 for (NamespaceDescriptor namespace : clusterSchemaService.getNamespaces()) { 3479 namespaces.add(namespace.getName()); 3480 } 3481 if (cpHost != null) { 3482 cpHost.postListNamespaces(namespaces); 3483 } 3484 return namespaces; 3485 } 3486 3487 @Override 3488 public List<TableName> listTableNamesByNamespace(String name) throws IOException { 3489 checkInitialized(); 3490 return listTableNames(name, null, true); 3491 } 3492 3493 @Override 3494 public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException { 3495 checkInitialized(); 3496 return listTableDescriptors(name, null, null, true); 3497 } 3498 3499 @Override 3500 public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning) 3501 throws IOException { 3502 if (cpHost != null) { 3503 cpHost.preAbortProcedure(this.procedureExecutor, procId); 3504 } 3505 3506 final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning); 3507 3508 if (cpHost != null) { 3509 cpHost.postAbortProcedure(); 3510 } 3511 3512 return result; 3513 } 3514 3515 @Override 3516 public List<Procedure<?>> getProcedures() throws IOException { 3517 if (cpHost != null) { 3518 cpHost.preGetProcedures(); 3519 } 3520 3521 @SuppressWarnings({ "unchecked", "rawtypes" }) 3522 List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures(); 3523 3524 if (cpHost != null) { 3525 cpHost.postGetProcedures(procList); 3526 } 3527 3528 return procList; 3529 } 3530 3531 @Override 3532 public List<LockedResource> getLocks() throws IOException { 3533 if (cpHost != null) { 3534 cpHost.preGetLocks(); 3535 } 3536 3537 MasterProcedureScheduler procedureScheduler = 3538 procedureExecutor.getEnvironment().getProcedureScheduler(); 3539 3540 final List<LockedResource> lockedResources = procedureScheduler.getLocks(); 3541 3542 if (cpHost != null) { 3543 cpHost.postGetLocks(lockedResources); 3544 } 3545 3546 return lockedResources; 3547 } 3548 3549 /** 3550 * Returns the list of table descriptors that match the specified request 3551 * @param namespace the namespace to query, or null if querying for all 3552 * @param regex The regular expression to match against, or null if querying for all 3553 * @param tableNameList the list of table names, or null if querying for all 3554 * @param includeSysTables False to match only against userspace tables 3555 * @return the list of table descriptors 3556 */ 3557 public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex, 3558 final List<TableName> tableNameList, final boolean includeSysTables) throws IOException { 3559 List<TableDescriptor> htds = new ArrayList<>(); 3560 if (cpHost != null) { 3561 cpHost.preGetTableDescriptors(tableNameList, htds, regex); 3562 } 3563 htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables); 3564 if (cpHost != null) { 3565 cpHost.postGetTableDescriptors(tableNameList, htds, regex); 3566 } 3567 return htds; 3568 } 3569 3570 /** 3571 * Returns the list of table names that match the specified request 3572 * @param regex The regular expression to match against, or null if querying for all 3573 * @param namespace the namespace to query, or null if querying for all 3574 * @param includeSysTables False to match only against userspace tables 3575 * @return the list of table names 3576 */ 3577 public List<TableName> listTableNames(final String namespace, final String regex, 3578 final boolean includeSysTables) throws IOException { 3579 List<TableDescriptor> htds = new ArrayList<>(); 3580 if (cpHost != null) { 3581 cpHost.preGetTableNames(htds, regex); 3582 } 3583 htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables); 3584 if (cpHost != null) { 3585 cpHost.postGetTableNames(htds, regex); 3586 } 3587 List<TableName> result = new ArrayList<>(htds.size()); 3588 for (TableDescriptor htd : htds) 3589 result.add(htd.getTableName()); 3590 return result; 3591 } 3592 3593 /** 3594 * Return a list of table table descriptors after applying any provided filter parameters. Note 3595 * that the user-facing description of this filter logic is presented on the class-level javadoc 3596 * of {@link NormalizeTableFilterParams}. 3597 */ 3598 private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds, 3599 final String namespace, final String regex, final List<TableName> tableNameList, 3600 final boolean includeSysTables) throws IOException { 3601 if (tableNameList == null || tableNameList.isEmpty()) { 3602 // request for all TableDescriptors 3603 Collection<TableDescriptor> allHtds; 3604 if (namespace != null && namespace.length() > 0) { 3605 // Do a check on the namespace existence. Will fail if does not exist. 3606 this.clusterSchemaService.getNamespace(namespace); 3607 allHtds = tableDescriptors.getByNamespace(namespace).values(); 3608 } else { 3609 allHtds = tableDescriptors.getAll().values(); 3610 } 3611 for (TableDescriptor desc : allHtds) { 3612 if ( 3613 tableStateManager.isTablePresent(desc.getTableName()) 3614 && (includeSysTables || !desc.getTableName().isSystemTable()) 3615 ) { 3616 htds.add(desc); 3617 } 3618 } 3619 } else { 3620 for (TableName s : tableNameList) { 3621 if (tableStateManager.isTablePresent(s)) { 3622 TableDescriptor desc = tableDescriptors.get(s); 3623 if (desc != null) { 3624 htds.add(desc); 3625 } 3626 } 3627 } 3628 } 3629 3630 // Retains only those matched by regular expression. 3631 if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex)); 3632 return htds; 3633 } 3634 3635 /** 3636 * Removes the table descriptors that don't match the pattern. 3637 * @param descriptors list of table descriptors to filter 3638 * @param pattern the regex to use 3639 */ 3640 private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors, 3641 final Pattern pattern) { 3642 final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR; 3643 Iterator<TableDescriptor> itr = descriptors.iterator(); 3644 while (itr.hasNext()) { 3645 TableDescriptor htd = itr.next(); 3646 String tableName = htd.getTableName().getNameAsString(); 3647 boolean matched = pattern.matcher(tableName).matches(); 3648 if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) { 3649 matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches(); 3650 } 3651 if (!matched) { 3652 itr.remove(); 3653 } 3654 } 3655 } 3656 3657 @Override 3658 public long getLastMajorCompactionTimestamp(TableName table) throws IOException { 3659 return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 3660 .getLastMajorCompactionTimestamp(table); 3661 } 3662 3663 @Override 3664 public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException { 3665 return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 3666 .getLastMajorCompactionTimestamp(regionName); 3667 } 3668 3669 /** 3670 * Gets the mob file compaction state for a specific table. Whether all the mob files are selected 3671 * is known during the compaction execution, but the statistic is done just before compaction 3672 * starts, it is hard to know the compaction type at that time, so the rough statistics are chosen 3673 * for the mob file compaction. Only two compaction states are available, 3674 * CompactionState.MAJOR_AND_MINOR and CompactionState.NONE. 3675 * @param tableName The current table name. 3676 * @return If a given table is in mob file compaction now. 3677 */ 3678 public GetRegionInfoResponse.CompactionState getMobCompactionState(TableName tableName) { 3679 AtomicInteger compactionsCount = mobCompactionStates.get(tableName); 3680 if (compactionsCount != null && compactionsCount.get() != 0) { 3681 return GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR; 3682 } 3683 return GetRegionInfoResponse.CompactionState.NONE; 3684 } 3685 3686 public void reportMobCompactionStart(TableName tableName) throws IOException { 3687 IdLock.Entry lockEntry = null; 3688 try { 3689 lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode()); 3690 AtomicInteger compactionsCount = mobCompactionStates.get(tableName); 3691 if (compactionsCount == null) { 3692 compactionsCount = new AtomicInteger(0); 3693 mobCompactionStates.put(tableName, compactionsCount); 3694 } 3695 compactionsCount.incrementAndGet(); 3696 } finally { 3697 if (lockEntry != null) { 3698 mobCompactionLock.releaseLockEntry(lockEntry); 3699 } 3700 } 3701 } 3702 3703 public void reportMobCompactionEnd(TableName tableName) throws IOException { 3704 IdLock.Entry lockEntry = null; 3705 try { 3706 lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode()); 3707 AtomicInteger compactionsCount = mobCompactionStates.get(tableName); 3708 if (compactionsCount != null) { 3709 int count = compactionsCount.decrementAndGet(); 3710 // remove the entry if the count is 0. 3711 if (count == 0) { 3712 mobCompactionStates.remove(tableName); 3713 } 3714 } 3715 } finally { 3716 if (lockEntry != null) { 3717 mobCompactionLock.releaseLockEntry(lockEntry); 3718 } 3719 } 3720 } 3721 3722 /** 3723 * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized, false 3724 * is returned. 3725 * @return The state of the load balancer, or false if the load balancer isn't defined. 3726 */ 3727 public boolean isBalancerOn() { 3728 return !isInMaintenanceMode() && loadBalancerTracker != null 3729 && loadBalancerTracker.isBalancerOn(); 3730 } 3731 3732 /** 3733 * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized, false is 3734 * returned. 3735 */ 3736 public boolean isNormalizerOn() { 3737 return !isInMaintenanceMode() && getRegionNormalizerManager().isNormalizerOn(); 3738 } 3739 3740 /** 3741 * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized, false is 3742 * returned. If switchType is illegal, false will return. 3743 * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType} 3744 * @return The state of the switch 3745 */ 3746 @Override 3747 public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) { 3748 return !isInMaintenanceMode() && splitOrMergeTracker != null 3749 && splitOrMergeTracker.isSplitOrMergeEnabled(switchType); 3750 } 3751 3752 /** 3753 * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned. 3754 * @return The name of the {@link LoadBalancer} in use. 3755 */ 3756 public String getLoadBalancerClassName() { 3757 return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, 3758 LoadBalancerFactory.getDefaultLoadBalancerClass().getName()); 3759 } 3760 3761 public SplitOrMergeTracker getSplitOrMergeTracker() { 3762 return splitOrMergeTracker; 3763 } 3764 3765 @Override 3766 public LoadBalancer getLoadBalancer() { 3767 return balancer; 3768 } 3769 3770 @Override 3771 public FavoredNodesManager getFavoredNodesManager() { 3772 return favoredNodesManager; 3773 } 3774 3775 private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { 3776 long procId = procedureExecutor.submitProcedure(procedure); 3777 procedure.getLatch().await(); 3778 return procId; 3779 } 3780 3781 @Override 3782 public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 3783 throws ReplicationException, IOException { 3784 LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" 3785 + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); 3786 return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled)); 3787 } 3788 3789 @Override 3790 public long removeReplicationPeer(String peerId) throws ReplicationException, IOException { 3791 LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId); 3792 return executePeerProcedure(new RemovePeerProcedure(peerId)); 3793 } 3794 3795 @Override 3796 public long enableReplicationPeer(String peerId) throws ReplicationException, IOException { 3797 LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId); 3798 return executePeerProcedure(new EnablePeerProcedure(peerId)); 3799 } 3800 3801 @Override 3802 public long disableReplicationPeer(String peerId) throws ReplicationException, IOException { 3803 LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId); 3804 return executePeerProcedure(new DisablePeerProcedure(peerId)); 3805 } 3806 3807 @Override 3808 public ReplicationPeerConfig getReplicationPeerConfig(String peerId) 3809 throws ReplicationException, IOException { 3810 if (cpHost != null) { 3811 cpHost.preGetReplicationPeerConfig(peerId); 3812 } 3813 LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId); 3814 ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId) 3815 .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId)); 3816 if (cpHost != null) { 3817 cpHost.postGetReplicationPeerConfig(peerId); 3818 } 3819 return peerConfig; 3820 } 3821 3822 @Override 3823 public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) 3824 throws ReplicationException, IOException { 3825 LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId 3826 + ", config=" + peerConfig); 3827 return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig)); 3828 } 3829 3830 @Override 3831 public List<ReplicationPeerDescription> listReplicationPeers(String regex) 3832 throws ReplicationException, IOException { 3833 if (cpHost != null) { 3834 cpHost.preListReplicationPeers(regex); 3835 } 3836 LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex); 3837 Pattern pattern = regex == null ? null : Pattern.compile(regex); 3838 List<ReplicationPeerDescription> peers = this.replicationPeerManager.listPeers(pattern); 3839 if (cpHost != null) { 3840 cpHost.postListReplicationPeers(regex); 3841 } 3842 return peers; 3843 } 3844 3845 /** 3846 * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional 3847 * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0 3848 * @param servers Region servers to decommission. 3849 */ 3850 public void decommissionRegionServers(final List<ServerName> servers, final boolean offload) 3851 throws HBaseIOException { 3852 List<ServerName> serversAdded = new ArrayList<>(servers.size()); 3853 // Place the decommission marker first. 3854 String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; 3855 for (ServerName server : servers) { 3856 try { 3857 String node = ZNodePaths.joinZNode(parentZnode, server.getServerName()); 3858 ZKUtil.createAndFailSilent(getZooKeeper(), node); 3859 } catch (KeeperException ke) { 3860 throw new HBaseIOException( 3861 this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke); 3862 } 3863 if (this.serverManager.addServerToDrainList(server)) { 3864 serversAdded.add(server); 3865 } 3866 ; 3867 } 3868 // Move the regions off the decommissioned servers. 3869 if (offload) { 3870 final List<ServerName> destServers = this.serverManager.createDestinationServersList(); 3871 for (ServerName server : serversAdded) { 3872 final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server); 3873 for (RegionInfo hri : regionsOnServer) { 3874 ServerName dest = balancer.randomAssignment(hri, destServers); 3875 if (dest == null) { 3876 throw new HBaseIOException("Unable to determine a plan to move " + hri); 3877 } 3878 RegionPlan rp = new RegionPlan(hri, server, dest); 3879 this.assignmentManager.moveAsync(rp); 3880 } 3881 } 3882 } 3883 } 3884 3885 /** 3886 * List region servers marked as decommissioned (previously called 'draining') to not get regions 3887 * assigned to them. 3888 * @return List of decommissioned servers. 3889 */ 3890 public List<ServerName> listDecommissionedRegionServers() { 3891 return this.serverManager.getDrainingServersList(); 3892 } 3893 3894 /** 3895 * Remove decommission marker (previously called 'draining') from a region server to allow regions 3896 * assignments. Load regions onto the server asynchronously if a list of regions is given 3897 * @param server Region server to remove decommission marker from. 3898 */ 3899 public void recommissionRegionServer(final ServerName server, 3900 final List<byte[]> encodedRegionNames) throws IOException { 3901 // Remove the server from decommissioned (draining) server list. 3902 String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; 3903 String node = ZNodePaths.joinZNode(parentZnode, server.getServerName()); 3904 try { 3905 ZKUtil.deleteNodeFailSilent(getZooKeeper(), node); 3906 } catch (KeeperException ke) { 3907 throw new HBaseIOException( 3908 this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke); 3909 } 3910 this.serverManager.removeServerFromDrainList(server); 3911 3912 // Load the regions onto the server if we are given a list of regions. 3913 if (encodedRegionNames == null || encodedRegionNames.isEmpty()) { 3914 return; 3915 } 3916 if (!this.serverManager.isServerOnline(server)) { 3917 return; 3918 } 3919 for (byte[] encodedRegionName : encodedRegionNames) { 3920 RegionState regionState = 3921 assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName)); 3922 if (regionState == null) { 3923 LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName)); 3924 continue; 3925 } 3926 RegionInfo hri = regionState.getRegion(); 3927 if (server.equals(regionState.getServerName())) { 3928 LOG.info("Skipping move of region " + hri.getRegionNameAsString() 3929 + " because region already assigned to the same server " + server + "."); 3930 continue; 3931 } 3932 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server); 3933 this.assignmentManager.moveAsync(rp); 3934 } 3935 } 3936 3937 @Override 3938 public LockManager getLockManager() { 3939 return lockManager; 3940 } 3941 3942 public QuotaObserverChore getQuotaObserverChore() { 3943 return this.quotaObserverChore; 3944 } 3945 3946 public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() { 3947 return this.spaceQuotaSnapshotNotifier; 3948 } 3949 3950 @SuppressWarnings("unchecked") 3951 private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) { 3952 Procedure<?> procedure = procedureExecutor.getProcedure(procId); 3953 if (procedure == null) { 3954 return null; 3955 } 3956 assert procedure instanceof RemoteProcedure; 3957 return (RemoteProcedure<MasterProcedureEnv, ?>) procedure; 3958 } 3959 3960 public void remoteProcedureCompleted(long procId) { 3961 LOG.debug("Remote procedure done, pid={}", procId); 3962 RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); 3963 if (procedure != null) { 3964 procedure.remoteOperationCompleted(procedureExecutor.getEnvironment()); 3965 } 3966 } 3967 3968 public void remoteProcedureFailed(long procId, RemoteProcedureException error) { 3969 LOG.debug("Remote procedure failed, pid={}", procId, error); 3970 RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); 3971 if (procedure != null) { 3972 procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error); 3973 } 3974 } 3975 3976 /** 3977 * Reopen regions provided in the argument 3978 * @param tableName The current table name 3979 * @param regionNames The region names of the regions to reopen 3980 * @param nonceGroup Identifier for the source of the request, a client or process 3981 * @param nonce A unique identifier for this operation from the client or process identified 3982 * by <code>nonceGroup</code> (the source must ensure each operation gets a 3983 * unique id). 3984 * @return procedure Id 3985 * @throws IOException if reopening region fails while running procedure 3986 */ 3987 long reopenRegions(final TableName tableName, final List<byte[]> regionNames, 3988 final long nonceGroup, final long nonce) throws IOException { 3989 3990 return MasterProcedureUtil 3991 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { 3992 3993 @Override 3994 protected void run() throws IOException { 3995 submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames)); 3996 } 3997 3998 @Override 3999 protected String getDescription() { 4000 return "ReopenTableRegionsProcedure"; 4001 } 4002 4003 }); 4004 4005 } 4006 4007 @Override 4008 public ReplicationPeerManager getReplicationPeerManager() { 4009 return replicationPeerManager; 4010 } 4011 4012 public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> 4013 getReplicationLoad(ServerName[] serverNames) { 4014 List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null); 4015 if (peerList == null) { 4016 return null; 4017 } 4018 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap = 4019 new HashMap<>(peerList.size()); 4020 peerList.stream() 4021 .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>())); 4022 for (ServerName serverName : serverNames) { 4023 List<ReplicationLoadSource> replicationLoadSources = 4024 getServerManager().getLoad(serverName).getReplicationLoadSourceList(); 4025 for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) { 4026 List<Pair<ServerName, ReplicationLoadSource>> replicationLoadSourceList = 4027 replicationLoadSourceMap.get(replicationLoadSource.getPeerID()); 4028 if (replicationLoadSourceList == null) { 4029 LOG.debug("{} does not exist, but it exists " 4030 + "in znode(/hbase/replication/rs). when the rs restarts, peerId is deleted, so " 4031 + "we just need to ignore it", replicationLoadSource.getPeerID()); 4032 continue; 4033 } 4034 replicationLoadSourceList.add(new Pair<>(serverName, replicationLoadSource)); 4035 } 4036 } 4037 for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) { 4038 if (loads.size() > 0) { 4039 loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag())); 4040 } 4041 } 4042 return replicationLoadSourceMap; 4043 } 4044 4045 /** 4046 * This method modifies the master's configuration in order to inject replication-related features 4047 */ 4048 @InterfaceAudience.Private 4049 public static void decorateMasterConfiguration(Configuration conf) { 4050 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS); 4051 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName(); 4052 if (!plugins.contains(cleanerClass)) { 4053 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass); 4054 } 4055 if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { 4056 plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); 4057 cleanerClass = ReplicationHFileCleaner.class.getCanonicalName(); 4058 if (!plugins.contains(cleanerClass)) { 4059 conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass); 4060 } 4061 } 4062 } 4063 4064 @Override 4065 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 4066 if (!this.isOnline() || !LoadBalancer.isMasterCanHostUserRegions(conf)) { 4067 return new HashMap<>(); 4068 } 4069 return super.getWalGroupsReplicationStatus(); 4070 } 4071 4072 public HbckChore getHbckChore() { 4073 return this.hbckChore; 4074 } 4075 4076 @Override 4077 public String getClusterId() { 4078 if (activeMaster) { 4079 return super.getClusterId(); 4080 } 4081 return cachedClusterId.getFromCacheOrFetch(); 4082 } 4083 4084 @Override 4085 public void runReplicationBarrierCleaner() { 4086 ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner; 4087 if (rbc != null) { 4088 rbc.chore(); 4089 } 4090 } 4091 4092 public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() { 4093 return this.snapshotQuotaChore; 4094 } 4095 4096 /** 4097 * Get the compaction state of the table 4098 * @param tableName The table name 4099 * @return CompactionState Compaction state of the table 4100 */ 4101 public CompactionState getCompactionState(final TableName tableName) { 4102 CompactionState compactionState = CompactionState.NONE; 4103 try { 4104 List<RegionInfo> regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); 4105 for (RegionInfo regionInfo : regions) { 4106 ServerName serverName = 4107 assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo); 4108 if (serverName == null) { 4109 continue; 4110 } 4111 ServerMetrics sl = serverManager.getLoad(serverName); 4112 if (sl == null) { 4113 continue; 4114 } 4115 RegionMetrics regionMetrics = sl.getRegionMetrics().get(regionInfo.getRegionName()); 4116 if (regionMetrics.getCompactionState() == CompactionState.MAJOR) { 4117 if (compactionState == CompactionState.MINOR) { 4118 compactionState = CompactionState.MAJOR_AND_MINOR; 4119 } else { 4120 compactionState = CompactionState.MAJOR; 4121 } 4122 } else if (regionMetrics.getCompactionState() == CompactionState.MINOR) { 4123 if (compactionState == CompactionState.MAJOR) { 4124 compactionState = CompactionState.MAJOR_AND_MINOR; 4125 } else { 4126 compactionState = CompactionState.MINOR; 4127 } 4128 } 4129 } 4130 } catch (Exception e) { 4131 compactionState = null; 4132 LOG.error("Exception when get compaction state for " + tableName.getNameAsString(), e); 4133 } 4134 return compactionState; 4135 } 4136 4137 @Override 4138 public MetaLocationSyncer getMetaLocationSyncer() { 4139 return metaLocationSyncer; 4140 } 4141 4142 @Override 4143 public void flushMasterStore() throws IOException { 4144 LOG.info("Force flush master local region."); 4145 if (this.cpHost != null) { 4146 try { 4147 cpHost.preMasterStoreFlush(); 4148 } catch (IOException ioe) { 4149 LOG.error("Error invoking master coprocessor preMasterStoreFlush()", ioe); 4150 } 4151 } 4152 masterRegion.flush(true); 4153 if (this.cpHost != null) { 4154 try { 4155 cpHost.postMasterStoreFlush(); 4156 } catch (IOException ioe) { 4157 LOG.error("Error invoking master coprocessor postMasterStoreFlush()", ioe); 4158 } 4159 } 4160 } 4161 4162 @RestrictedApi(explanation = "Should only be called in tests", link = "", 4163 allowedOnPath = ".*/src/test/.*") 4164 public MasterRegion getMasterRegion() { 4165 return masterRegion; 4166 } 4167 4168 public Collection<ServerName> getLiveRegionServers() { 4169 return regionServerTracker.getRegionServers(); 4170 } 4171 4172 @RestrictedApi(explanation = "Should only be called in tests", link = "", 4173 allowedOnPath = ".*/src/test/.*") 4174 void setLoadBalancer(LoadBalancer loadBalancer) { 4175 this.balancer = loadBalancer; 4176 } 4177 4178 @RestrictedApi(explanation = "Should only be called in tests", link = "", 4179 allowedOnPath = ".*/src/test/.*") 4180 void setAssignmentManager(AssignmentManager assignmentManager) { 4181 this.assignmentManager = assignmentManager; 4182 } 4183 4184 @RestrictedApi(explanation = "Should only be called in tests", link = "", 4185 allowedOnPath = ".*/src/test/.*") 4186 static void setDisableBalancerChoreForTest(boolean disable) { 4187 disableBalancerChoreForTest = disable; 4188 } 4189 4190 @Override 4191 public void onConfigurationChange(Configuration newConf) { 4192 super.onConfigurationChange(newConf); 4193 // append the quotas observer back to the master coprocessor key 4194 setQuotasObserver(newConf); 4195 // update region server coprocessor if the configuration has changed. 4196 if ( 4197 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 4198 CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode 4199 ) { 4200 LOG.info("Update the master coprocessor(s) because the configuration has changed"); 4201 initializeCoprocessorHost(newConf); 4202 } 4203 } 4204 4205 @RestrictedApi(explanation = "Should only be called in tests", link = "", 4206 allowedOnPath = ".*/src/test/.*") 4207 public ConfigurationManager getConfigurationManager() { 4208 return configurationManager; 4209 } 4210 4211 private void setQuotasObserver(Configuration conf) { 4212 // Add the Observer to delete quotas on table deletion before starting all CPs by 4213 // default with quota support, avoiding if user specifically asks to not load this Observer. 4214 if (QuotaUtil.isQuotaEnabled(conf)) { 4215 updateConfigurationForQuotasObserver(conf); 4216 } 4217 } 4218 4219 private void initializeCoprocessorHost(Configuration conf) { 4220 // initialize master side coprocessors before we start handling requests 4221 this.cpHost = new MasterCoprocessorHost(this, conf); 4222 } 4223 4224}