View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Descriptors;
25  import com.google.protobuf.Service;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.lang.reflect.Constructor;
30  import java.lang.reflect.InvocationTargetException;
31  import java.net.InetAddress;
32  import java.net.InetSocketAddress;
33  import java.net.UnknownHostException;
34  import java.util.ArrayList;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.Comparator;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.Map.Entry;
42  import java.util.Set;
43  import java.util.concurrent.CountDownLatch;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicInteger;
46  import java.util.concurrent.atomic.AtomicReference;
47  import java.util.regex.Pattern;
48
49  import javax.servlet.ServletException;
50  import javax.servlet.http.HttpServlet;
51  import javax.servlet.http.HttpServletRequest;
52  import javax.servlet.http.HttpServletResponse;
53
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  import org.apache.hadoop.conf.Configuration;
57  import org.apache.hadoop.fs.Path;
58  import org.apache.hadoop.hbase.ClusterStatus;
59  import org.apache.hadoop.hbase.CoordinatedStateException;
60  import org.apache.hadoop.hbase.CoordinatedStateManager;
61  import org.apache.hadoop.hbase.DoNotRetryIOException;
62  import org.apache.hadoop.hbase.HBaseIOException;
63  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
64  import org.apache.hadoop.hbase.HColumnDescriptor;
65  import org.apache.hadoop.hbase.HConstants;
66  import org.apache.hadoop.hbase.HRegionInfo;
67  import org.apache.hadoop.hbase.HTableDescriptor;
68  import org.apache.hadoop.hbase.MasterNotRunningException;
69  import org.apache.hadoop.hbase.MetaTableAccessor;
70  import org.apache.hadoop.hbase.NamespaceDescriptor;
71  import org.apache.hadoop.hbase.PleaseHoldException;
72  import org.apache.hadoop.hbase.ProcedureInfo;
73  import org.apache.hadoop.hbase.RegionStateListener;
74  import org.apache.hadoop.hbase.ScheduledChore;
75  import org.apache.hadoop.hbase.ServerLoad;
76  import org.apache.hadoop.hbase.ServerName;
77  import org.apache.hadoop.hbase.TableDescriptors;
78  import org.apache.hadoop.hbase.TableName;
79  import org.apache.hadoop.hbase.TableNotDisabledException;
80  import org.apache.hadoop.hbase.TableNotFoundException;
81  import org.apache.hadoop.hbase.UnknownRegionException;
82  import org.apache.hadoop.hbase.classification.InterfaceAudience;
83  import org.apache.hadoop.hbase.client.MasterSwitchType;
84  import org.apache.hadoop.hbase.client.Result;
85  import org.apache.hadoop.hbase.client.TableState;
86  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
87  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
88  import org.apache.hadoop.hbase.exceptions.DeserializationException;
89  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
90  import org.apache.hadoop.hbase.executor.ExecutorType;
91  import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
92  import org.apache.hadoop.hbase.ipc.RpcServer;
93  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
94  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
95  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
96  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
97  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
98  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
99  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
100 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
101 import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
102 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
103 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
104 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
105 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
106 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
107 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
108 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
109 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
110 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
111 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
112 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
113 import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
114 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
115 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
116 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
117 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
118 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
119 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
120 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
121 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
122 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
123 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
124 import org.apache.hadoop.hbase.mob.MobConstants;
125 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
126 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
127 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
128 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
129 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
130 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
131 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
133 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
134 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
135 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
136 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
137 import org.apache.hadoop.hbase.regionserver.HRegionServer;
138 import org.apache.hadoop.hbase.regionserver.HStore;
139 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
140 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
141 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
142 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
143 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
144 import org.apache.hadoop.hbase.replication.ReplicationFactory;
145 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
146 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
147 import org.apache.hadoop.hbase.replication.regionserver.Replication;
148 import org.apache.hadoop.hbase.security.UserProvider;
149 import org.apache.hadoop.hbase.util.Addressing;
150 import org.apache.hadoop.hbase.util.Bytes;
151 import org.apache.hadoop.hbase.util.CompressionTest;
152 import org.apache.hadoop.hbase.util.EncryptionTest;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155 import org.apache.hadoop.hbase.util.HasThread;
156 import org.apache.hadoop.hbase.util.IdLock;
157 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
158 import org.apache.hadoop.hbase.util.Pair;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.util.ZKDataMigrator;
162 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
163 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
164 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
165 import org.apache.hadoop.hbase.zookeeper.MasterMaintenanceModeTracker;
166 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
167 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
168 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
169 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
170 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
171 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
172 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
173 import org.apache.zookeeper.KeeperException;
174 import org.mortbay.jetty.Connector;
175 import org.mortbay.jetty.nio.SelectChannelConnector;
176 import org.mortbay.jetty.servlet.Context;
177
178 /**
179  * HMaster is the "master server" for HBase. An HBase cluster has one active
180  * master.  If many masters are started, all compete.  Whichever wins goes on to
181  * run the cluster.  All others park themselves in their constructor until
182  * master or cluster shutdown or until the active master loses its lease in
183  * zookeeper.  Thereafter, all running master jostle to take over master role.
184  *
185  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
186  * this case it will tell all regionservers to go down and then wait on them
187  * all reporting in that they are down.  This master will then shut itself down.
188  *
189  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
190  *
191  * @see org.apache.zookeeper.Watcher
192  */
193 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
194 @SuppressWarnings("deprecation")
195 public class HMaster extends HRegionServer implements MasterServices {
196   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
197
198   /**
199    * Protection against zombie master. Started once Master accepts active responsibility and
200    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
201    */
202   private static class InitializationMonitor extends HasThread {
203     /** The amount of time in milliseconds to sleep before checking initialization status. */
204     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
205     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
206
207     /**
208      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
209      * true, do nothing otherwise.
210      */
211     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
212     public static final boolean HALT_DEFAULT = false;
213
214     private final HMaster master;
215     private final long timeout;
216     private final boolean haltOnTimeout;
217
218     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
219     InitializationMonitor(HMaster master) {
220       super("MasterInitializationMonitor");
221       this.master = master;
222       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
223       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
224       this.setDaemon(true);
225     }
226
227     @Override
228     public void run() {
229       try {
230         while (!master.isStopped() && master.isActiveMaster()) {
231           Thread.sleep(timeout);
232           if (master.isInitialized()) {
233             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
234           } else {
235             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
236                 + " consider submitting a bug report including a thread dump of this process.");
237             if (haltOnTimeout) {
238               LOG.error("Zombie Master exiting. Thread dump to stdout");
239               Threads.printThreadInfo(System.out, "Zombie HMaster");
240               System.exit(-1);
241             }
242           }
243         }
244       } catch (InterruptedException ie) {
245         LOG.trace("InitMonitor thread interrupted. Existing.");
246       }
247     }
248   }
249
250   // MASTER is name of the webapp and the attribute name used stuffing this
251   //instance into web context.
252   public static final String MASTER = "master";
253
254   // Manager and zk listener for master election
255   private final ActiveMasterManager activeMasterManager;
256   // Region server tracker
257   RegionServerTracker regionServerTracker;
258   // Draining region server tracker
259   private DrainingServerTracker drainingServerTracker;
260   // Tracker for load balancer state
261   LoadBalancerTracker loadBalancerTracker;
262
263   // Tracker for split and merge state
264   private SplitOrMergeTracker splitOrMergeTracker;
265
266   // Tracker for region normalizer state
267   private RegionNormalizerTracker regionNormalizerTracker;
268
269   //Tracker for master maintenance mode setting
270   private MasterMaintenanceModeTracker maintenanceModeTracker;
271
272   private ClusterSchemaService clusterSchemaService;
273
274   // Metrics for the HMaster
275   final MetricsMaster metricsMaster;
276   // file system manager for the master FS operations
277   private MasterFileSystem fileSystemManager;
278   private MasterWalManager walManager;
279
280   // server manager to deal with region server info
281   private volatile ServerManager serverManager;
282
283   // manager of assignment nodes in zookeeper
284   private AssignmentManager assignmentManager;
285
286   // buffer for "fatal error" notices from region servers
287   // in the cluster. This is only used for assisting
288   // operations/debugging.
289   MemoryBoundedLogMessageBuffer rsFatals;
290
291   // flag set after we become the active master (used for testing)
292   private volatile boolean isActiveMaster = false;
293
294   // flag set after we complete initialization once active,
295   // it is not private since it's used in unit tests
296   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
297
298   // flag set after master services are started,
299   // initialization may have not completed yet.
300   volatile boolean serviceStarted = false;
301
302   // flag set after we complete assignMeta.
303   private final ProcedureEvent serverCrashProcessingEnabled =
304     new ProcedureEvent("server crash processing");
305
306   private LoadBalancer balancer;
307   private RegionNormalizer normalizer;
308   private BalancerChore balancerChore;
309   private RegionNormalizerChore normalizerChore;
310   private ClusterStatusChore clusterStatusChore;
311   private ClusterStatusPublisher clusterStatusPublisherChore = null;
312   private PeriodicDoMetrics periodicDoMetricsChore = null;
313
314   CatalogJanitor catalogJanitorChore;
315   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
316   private ReplicationMetaCleaner replicationMetaCleaner;
317   private LogCleaner logCleaner;
318   private HFileCleaner hfileCleaner;
319   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
320   private MobCompactionChore mobCompactChore;
321   private MasterMobCompactionThread mobCompactThread;
322   // used to synchronize the mobCompactionStates
323   private final IdLock mobCompactionLock = new IdLock();
324   // save the information of mob compactions in tables.
325   // the key is table name, the value is the number of compactions in that table.
326   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
327
328   MasterCoprocessorHost cpHost;
329
330   private final boolean preLoadTableDescriptors;
331
332   // Time stamps for when a hmaster became active
333   private long masterActiveTime;
334
335   //should we check the compression codec type at master side, default true, HBASE-6370
336   private final boolean masterCheckCompression;
337
338   //should we check encryption settings at master side, default true
339   private final boolean masterCheckEncryption;
340
341   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
342
343   // monitor for snapshot of hbase tables
344   SnapshotManager snapshotManager;
345   // monitor for distributed procedures
346   private MasterProcedureManagerHost mpmHost;
347
348   // it is assigned after 'initialized' guard set to true, so should be volatile
349   private volatile MasterQuotaManager quotaManager;
350
351   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
352   private WALProcedureStore procedureStore;
353
354   // handle table states
355   private TableStateManager tableStateManager;
356
357   private long splitPlanCount;
358   private long mergePlanCount;
359
360   /** flag used in test cases in order to simulate RS failures during master initialization */
361   private volatile boolean initializationBeforeMetaAssignment = false;
362
363   /** jetty server for master to redirect requests to regionserver infoServer */
364   private org.mortbay.jetty.Server masterJettyServer;
365
366   public static class RedirectServlet extends HttpServlet {
367     private static final long serialVersionUID = 2894774810058302472L;
368     private static int regionServerInfoPort;
369
370     @Override
371     public void doGet(HttpServletRequest request,
372         HttpServletResponse response) throws ServletException, IOException {
373       String redirectUrl = request.getScheme() + "://"
374         + request.getServerName() + ":" + regionServerInfoPort
375         + request.getRequestURI();
376       response.sendRedirect(redirectUrl);
377     }
378   }
379
380   private static class PeriodicDoMetrics extends ScheduledChore {
381     private final HMaster server;
382     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
383       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
384       this.server = server;
385     }
386
387     @Override
388     protected void chore() {
389       server.doMetrics();
390     }
391   }
392
393   /**
394    * Initializes the HMaster. The steps are as follows:
395    * <p>
396    * <ol>
397    * <li>Initialize the local HRegionServer
398    * <li>Start the ActiveMasterManager.
399    * </ol>
400    * <p>
401    * Remaining steps of initialization occur in
402    * #finishActiveMasterInitialization(MonitoredTask) after
403    * the master becomes the active one.
404    */
405   public HMaster(final Configuration conf, CoordinatedStateManager csm)
406       throws IOException, KeeperException {
407     super(conf, csm);
408     this.rsFatals = new MemoryBoundedLogMessageBuffer(
409       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
410
411     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
412       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
413
414     // Disable usage of meta replicas in the master
415     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
416
417     Replication.decorateMasterConfiguration(this.conf);
418
419     // Hack! Maps DFSClient => Master for logs.  HDFS made this
420     // config param for task trackers, but we can piggyback off of it.
421     if (this.conf.get("mapreduce.task.attempt.id") == null) {
422       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
423     }
424
425     // should we check the compression codec type at master side, default true, HBASE-6370
426     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
427
428     // should we check encryption settings at master side, default true
429     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
430
431     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
432
433     // preload table descriptor at startup
434     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
435
436     // Do we publish the status?
437
438     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
439         HConstants.STATUS_PUBLISHED_DEFAULT);
440     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
441         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
442             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
443             ClusterStatusPublisher.Publisher.class);
444
445     if (shouldPublish) {
446       if (publisherClass == null) {
447         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
448             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
449             " is not set - not publishing status");
450       } else {
451         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
452         getChoreService().scheduleChore(clusterStatusPublisherChore);
453       }
454     }
455
456     // Some unit tests don't need a cluster, so no zookeeper at all
457     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
458       setInitLatch(new CountDownLatch(1));
459       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
460       int infoPort = putUpJettyServer();
461       startActiveMasterManager(infoPort);
462     } else {
463       activeMasterManager = null;
464     }
465   }
466
467   // return the actual infoPort, -1 means disable info server.
468   private int putUpJettyServer() throws IOException {
469     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
470       return -1;
471     }
472     int infoPort = conf.getInt("hbase.master.info.port.orig",
473       HConstants.DEFAULT_MASTER_INFOPORT);
474     // -1 is for disabling info server, so no redirecting
475     if (infoPort < 0 || infoServer == null) {
476       return -1;
477     }
478     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
479     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
480       String msg =
481           "Failed to start redirecting jetty server. Address " + addr
482               + " does not belong to this host. Correct configuration parameter: "
483               + "hbase.master.info.bindAddress";
484       LOG.error(msg);
485       throw new IOException(msg);
486     }
487
488     RedirectServlet.regionServerInfoPort = infoServer.getPort();
489     if(RedirectServlet.regionServerInfoPort == infoPort) {
490       return infoPort;
491     }
492     masterJettyServer = new org.mortbay.jetty.Server();
493     Connector connector = new SelectChannelConnector();
494     connector.setHost(addr);
495     connector.setPort(infoPort);
496     masterJettyServer.addConnector(connector);
497     masterJettyServer.setStopAtShutdown(true);
498     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
499     context.addServlet(RedirectServlet.class, "/*");
500     try {
501       masterJettyServer.start();
502     } catch (Exception e) {
503       throw new IOException("Failed to start redirecting jetty server", e);
504     }
505     return connector.getLocalPort();
506   }
507
508   @Override
509   protected TableDescriptors getFsTableDescriptors() throws IOException {
510     return super.getFsTableDescriptors();
511   }
512
513   /**
514    * For compatibility, if failed with regionserver credentials, try the master one
515    */
516   @Override
517   protected void login(UserProvider user, String host) throws IOException {
518     try {
519       super.login(user, host);
520     } catch (IOException ie) {
521       user.login("hbase.master.keytab.file",
522         "hbase.master.kerberos.principal", host);
523     }
524   }
525
526   /**
527    * If configured to put regions on active master,
528    * wait till a backup master becomes active.
529    * Otherwise, loop till the server is stopped or aborted.
530    */
531   @Override
532   protected void waitForMasterActive(){
533     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
534     while (!(tablesOnMaster && isActiveMaster)
535         && !isStopped() && !isAborted()) {
536       sleeper.sleep();
537     }
538   }
539
540   @VisibleForTesting
541   public MasterRpcServices getMasterRpcServices() {
542     return (MasterRpcServices)rpcServices;
543   }
544
545   public boolean balanceSwitch(final boolean b) throws IOException {
546     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
547   }
548
549   @Override
550   protected String getProcessName() {
551     return MASTER;
552   }
553
554   @Override
555   protected boolean canCreateBaseZNode() {
556     return true;
557   }
558
559   @Override
560   protected boolean canUpdateTableDescriptor() {
561     return true;
562   }
563
564   @Override
565   protected RSRpcServices createRpcServices() throws IOException {
566     return new MasterRpcServices(this);
567   }
568
569   @Override
570   protected void configureInfoServer() {
571     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
572     infoServer.setAttribute(MASTER, this);
573     if (BaseLoadBalancer.tablesOnMaster(conf)) {
574       super.configureInfoServer();
575     }
576   }
577
578   @Override
579   protected Class<? extends HttpServlet> getDumpServlet() {
580     return MasterDumpServlet.class;
581   }
582
583   /**
584    * Emit the HMaster metrics, such as region in transition metrics.
585    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
586    */
587   private void doMetrics() {
588     try {
589       if (assignmentManager != null) {
590         assignmentManager.updateRegionsInTransitionMetrics();
591       }
592     } catch (Throwable e) {
593       LOG.error("Couldn't update metrics: " + e.getMessage());
594     }
595   }
596
597   MetricsMaster getMasterMetrics() {
598     return metricsMaster;
599   }
600
601   /**
602    * Initialize all ZK based system trackers.
603    */
604   void initializeZKBasedSystemTrackers() throws IOException,
605       InterruptedException, KeeperException, CoordinatedStateException {
606     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
607     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
608     this.normalizer.setMasterServices(this);
609     this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
610     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
611     this.loadBalancerTracker.start();
612
613     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
614     this.regionNormalizerTracker.start();
615
616     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
617     this.splitOrMergeTracker.start();
618
619     this.assignmentManager = new AssignmentManager(this, serverManager,
620       this.balancer, this.service, this.metricsMaster,
621       this.tableLockManager, tableStateManager);
622
623     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
624     this.regionServerTracker.start();
625
626     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
627     this.drainingServerTracker.start();
628
629     this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
630     this.maintenanceModeTracker.start();
631
632     // Set the cluster as up.  If new RSs, they'll be waiting on this before
633     // going ahead with their startup.
634     boolean wasUp = this.clusterStatusTracker.isClusterUp();
635     if (!wasUp) this.clusterStatusTracker.setClusterUp();
636
637     LOG.info("Server active/primary master=" + this.serverName +
638         ", sessionid=0x" +
639         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
640         ", setting cluster-up flag (Was=" + wasUp + ")");
641
642     // create/initialize the snapshot manager and other procedure managers
643     this.snapshotManager = new SnapshotManager();
644     this.mpmHost = new MasterProcedureManagerHost();
645     this.mpmHost.register(this.snapshotManager);
646     this.mpmHost.register(new MasterFlushTableProcedureManager());
647     this.mpmHost.loadProcedures(conf);
648     this.mpmHost.initialize(this, this.metricsMaster);
649
650   }
651
652   /**
653    * Finish initialization of HMaster after becoming the primary master.
654    *
655    * <ol>
656    * <li>Initialize master components - file system manager, server manager,
657    *     assignment manager, region server tracker, etc</li>
658    * <li>Start necessary service threads - balancer, catalog janior,
659    *     executor services, etc</li>
660    * <li>Set cluster as UP in ZooKeeper</li>
661    * <li>Wait for RegionServers to check-in</li>
662    * <li>Split logs and perform data recovery, if necessary</li>
663    * <li>Ensure assignment of meta/namespace regions<li>
664    * <li>Handle either fresh cluster start or master failover</li>
665    * </ol>
666    */
667   private void finishActiveMasterInitialization(MonitoredTask status)
668       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
669
670     isActiveMaster = true;
671     Thread zombieDetector = new Thread(new InitializationMonitor(this));
672     zombieDetector.start();
673
674     /*
675      * We are active master now... go initialize components we need to run.
676      * Note, there may be dross in zk from previous runs; it'll get addressed
677      * below after we determine if cluster startup or failover.
678      */
679
680     status.setStatus("Initializing Master file system");
681
682     this.masterActiveTime = System.currentTimeMillis();
683     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
684     this.fileSystemManager = new MasterFileSystem(this);
685     this.walManager = new MasterWalManager(this);
686
687     // enable table descriptors cache
688     this.tableDescriptors.setCacheOn();
689     // set the META's descriptor to the correct replication
690     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
691         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
692     // warm-up HTDs cache on master initialization
693     if (preLoadTableDescriptors) {
694       status.setStatus("Pre-loading table descriptors");
695       this.tableDescriptors.getAll();
696     }
697
698     // publish cluster ID
699     status.setStatus("Publishing Cluster ID in ZooKeeper");
700     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
701     this.initLatch.countDown();
702
703     this.serverManager = createServerManager(this);
704
705     // Invalidate all write locks held previously
706     this.tableLockManager.reapWriteLocks();
707     this.tableStateManager = new TableStateManager(this);
708
709     status.setStatus("Initializing ZK system trackers");
710     initializeZKBasedSystemTrackers();
711
712     // This is for backwards compatibility
713     // See HBASE-11393
714     status.setStatus("Update TableCFs node in ZNode");
715     TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
716             conf, this.clusterConnection);
717     tableCFsUpdater.update();
718
719     // initialize master side coprocessors before we start handling requests
720     status.setStatus("Initializing master coprocessors");
721     this.cpHost = new MasterCoprocessorHost(this, this.conf);
722
723     // start up all service threads.
724     status.setStatus("Initializing master service threads");
725     startServiceThreads();
726
727     // Wake up this server to check in
728     sleeper.skipSleepCycle();
729
730     // Wait for region servers to report in
731     status.setStatus("Wait for region servers to report in");
732     waitForRegionServers(status);
733
734     // get a list for previously failed RS which need log splitting work
735     // we recover hbase:meta region servers inside master initialization and
736     // handle other failed servers in SSH in order to start up master node ASAP
737     MasterMetaBootstrap metaBootstrap = createMetaBootstrap(this, status);
738     metaBootstrap.splitMetaLogsBeforeAssignment();
739
740     this.initializationBeforeMetaAssignment = true;
741
742     // Wait for regionserver to finish initialization.
743     if (BaseLoadBalancer.tablesOnMaster(conf)) {
744       waitForServerOnline();
745     }
746
747     //initialize load balancer
748     this.balancer.setClusterStatus(getClusterStatus());
749     this.balancer.setMasterServices(this);
750     this.balancer.initialize();
751
752     // Check if master is shutting down because of some issue
753     // in initializing the regionserver or the balancer.
754     if (isStopped()) return;
755
756     // Make sure meta assigned before proceeding.
757     status.setStatus("Assigning Meta Region");
758     metaBootstrap.assignMeta();
759
760     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
761     // assigned when master is shutting down
762     if (isStopped()) return;
763
764     // migrating existent table state from zk, so splitters
765     // and recovery process treat states properly.
766     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
767         .queryForTableStates(getZooKeeper()).entrySet()) {
768       LOG.info("Converting state from zk to new states:" + entry);
769       tableStateManager.setTableState(entry.getKey(), entry.getValue());
770     }
771     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
772
773     status.setStatus("Submitting log splitting work for previously failed region servers");
774     metaBootstrap.processDeadServers();
775
776     // Fix up assignment manager status
777     status.setStatus("Starting assignment manager");
778     this.assignmentManager.joinCluster();
779
780     // set cluster status again after user regions are assigned
781     this.balancer.setClusterStatus(getClusterStatus());
782
783     // Start balancer and meta catalog janitor after meta and regions have been assigned.
784     status.setStatus("Starting balancer and catalog janitor");
785     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
786     getChoreService().scheduleChore(clusterStatusChore);
787     this.balancerChore = new BalancerChore(this);
788     getChoreService().scheduleChore(balancerChore);
789     this.normalizerChore = new RegionNormalizerChore(this);
790     getChoreService().scheduleChore(normalizerChore);
791     this.catalogJanitorChore = new CatalogJanitor(this, this);
792     getChoreService().scheduleChore(catalogJanitorChore);
793
794     // Do Metrics periodically
795     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
796     getChoreService().scheduleChore(periodicDoMetricsChore);
797
798     status.setStatus("Starting cluster schema service");
799     initClusterSchemaService();
800
801     if (this.cpHost != null) {
802       try {
803         this.cpHost.preMasterInitialization();
804       } catch (IOException e) {
805         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
806       }
807     }
808
809     status.markComplete("Initialization successful");
810     LOG.info("Master has completed initialization");
811     configurationManager.registerObserver(this.balancer);
812
813     // Set master as 'initialized'.
814     setInitialized(true);
815
816     status.setStatus("Assign meta replicas");
817     metaBootstrap.assignMetaReplicas();
818
819     status.setStatus("Starting quota manager");
820     initQuotaManager();
821
822     // clear the dead servers with same host name and port of online server because we are not
823     // removing dead server with same hostname and port of rs which is trying to check in before
824     // master initialization. See HBASE-5916.
825     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
826
827     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
828     status.setStatus("Checking ZNode ACLs");
829     zooKeeper.checkAndSetZNodeAcls();
830
831     status.setStatus("Initializing MOB Cleaner");
832     initMobCleaner();
833
834     status.setStatus("Calling postStartMaster coprocessors");
835     if (this.cpHost != null) {
836       // don't let cp initialization errors kill the master
837       try {
838         this.cpHost.postStartMaster();
839       } catch (IOException ioe) {
840         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
841       }
842     }
843
844     zombieDetector.interrupt();
845   }
846
847   private void initMobCleaner() {
848     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
849     getChoreService().scheduleChore(expiredMobFileCleanerChore);
850
851     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
852         MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
853     if (mobCompactionPeriod > 0) {
854       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
855       getChoreService().scheduleChore(mobCompactChore);
856     } else {
857       LOG
858         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
859     }
860     this.mobCompactThread = new MasterMobCompactionThread(this);
861   }
862
863   /**
864    * Create a {@link MasterMetaBootstrap} instance.
865    */
866   MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
867     // We put this out here in a method so can do a Mockito.spy and stub it out
868     // w/ a mocked up MasterMetaBootstrap.
869     return new MasterMetaBootstrap(master, status);
870   }
871
872   /**
873    * Create a {@link ServerManager} instance.
874    */
875   ServerManager createServerManager(final MasterServices master) throws IOException {
876     // We put this out here in a method so can do a Mockito.spy and stub it out
877     // w/ a mocked up ServerManager.
878     setupClusterConnection();
879     return new ServerManager(master);
880   }
881
882   private void waitForRegionServers(final MonitoredTask status)
883       throws IOException, InterruptedException {
884     this.serverManager.waitForRegionServers(status);
885     // Check zk for region servers that are up but didn't register
886     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
887       // The isServerOnline check is opportunistic, correctness is handled inside
888       if (!this.serverManager.isServerOnline(sn)
889           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
890         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
891       }
892     }
893   }
894
895   void initClusterSchemaService() throws IOException, InterruptedException {
896     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
897     this.clusterSchemaService.startAndWait();
898     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
899   }
900
901   void initQuotaManager() throws IOException {
902     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
903     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
904     quotaManager.start();
905     this.quotaManager = quotaManager;
906   }
907
908   boolean isCatalogJanitorEnabled() {
909     return catalogJanitorChore != null ?
910       catalogJanitorChore.getEnabled() : false;
911   }
912
913   @Override
914   public TableDescriptors getTableDescriptors() {
915     return this.tableDescriptors;
916   }
917
918   @Override
919   public ServerManager getServerManager() {
920     return this.serverManager;
921   }
922
923   @Override
924   public MasterFileSystem getMasterFileSystem() {
925     return this.fileSystemManager;
926   }
927
928   @Override
929   public MasterWalManager getMasterWalManager() {
930     return this.walManager;
931   }
932
933   @Override
934   public TableStateManager getTableStateManager() {
935     return tableStateManager;
936   }
937
938   /*
939    * Start up all services. If any of these threads gets an unhandled exception
940    * then they just die with a logged message.  This should be fine because
941    * in general, we do not expect the master to get such unhandled exceptions
942    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
943    *  need to install an unexpected exception handler.
944    */
945   private void startServiceThreads() throws IOException{
946    // Start the executor service pools
947    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
948       conf.getInt("hbase.master.executor.openregion.threads", 5));
949    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
950       conf.getInt("hbase.master.executor.closeregion.threads", 5));
951    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
952       conf.getInt("hbase.master.executor.serverops.threads", 5));
953    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
954       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
955    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
956       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
957
958    // We depend on there being only one instance of this executor running
959    // at a time.  To do concurrency, would need fencing of enable/disable of
960    // tables.
961    // Any time changing this maxThreads to > 1, pls see the comment at
962    // AccessController#postCompletedCreateTableAction
963    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
964    startProcedureExecutor();
965
966    // Start log cleaner thread
967    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
968    this.logCleaner =
969       new LogCleaner(cleanerInterval,
970          this, conf, getMasterWalManager().getFileSystem(),
971          getMasterWalManager().getOldLogDir());
972     getChoreService().scheduleChore(logCleaner);
973
974    //start the hfile archive cleaner thread
975     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
976     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
977         .getFileSystem(), archiveDir);
978     getChoreService().scheduleChore(hfileCleaner);
979     serviceStarted = true;
980     if (LOG.isTraceEnabled()) {
981       LOG.trace("Started service threads");
982     }
983     if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
984         ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class && !conf
985         .getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
986       try {
987         replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
988             cleanerInterval, this.getZooKeeper(), this.conf);
989         getChoreService().scheduleChore(replicationZKLockCleanerChore);
990       } catch (Exception e) {
991         LOG.error("start replicationZKLockCleanerChore failed", e);
992       }
993     }
994     replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
995     getChoreService().scheduleChore(replicationMetaCleaner);
996   }
997
998   @Override
999   protected void sendShutdownInterrupt() {
1000     super.sendShutdownInterrupt();
1001     stopProcedureExecutor();
1002   }
1003
1004   @Override
1005   protected void stopServiceThreads() {
1006     if (masterJettyServer != null) {
1007       LOG.info("Stopping master jetty server");
1008       try {
1009         masterJettyServer.stop();
1010       } catch (Exception e) {
1011         LOG.error("Failed to stop master jetty server", e);
1012       }
1013     }
1014     super.stopServiceThreads();
1015     stopChores();
1016
1017     // Wait for all the remaining region servers to report in IFF we were
1018     // running a cluster shutdown AND we were NOT aborting.
1019     if (!isAborted() && this.serverManager != null &&
1020         this.serverManager.isClusterShutdown()) {
1021       this.serverManager.letRegionServersShutdown();
1022     }
1023     if (LOG.isDebugEnabled()) {
1024       LOG.debug("Stopping service threads");
1025     }
1026     // Clean up and close up shop
1027     if (this.logCleaner != null) this.logCleaner.cancel(true);
1028     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1029     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
1030     if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
1031     if (this.quotaManager != null) this.quotaManager.stop();
1032     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1033     if (this.serverManager != null) this.serverManager.stop();
1034     if (this.assignmentManager != null) this.assignmentManager.stop();
1035     if (this.walManager != null) this.walManager.stop();
1036     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1037     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1038   }
1039
1040   private void startProcedureExecutor() throws IOException {
1041     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1042     final Path logDir = new Path(fileSystemManager.getRootDir(),
1043         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1044
1045     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1046         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1047     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1048     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1049         procEnv.getProcedureQueue());
1050
1051     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1052         Math.max(Runtime.getRuntime().availableProcessors(),
1053           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1054     final boolean abortOnCorruption = conf.getBoolean(
1055         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1056         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1057     procedureStore.start(numThreads);
1058     procedureExecutor.start(numThreads, abortOnCorruption);
1059   }
1060
1061   private void stopProcedureExecutor() {
1062     if (procedureExecutor != null) {
1063       procedureExecutor.stop();
1064     }
1065
1066     if (procedureStore != null) {
1067       procedureStore.stop(isAborted());
1068     }
1069   }
1070
1071   private void stopChores() {
1072     if (this.expiredMobFileCleanerChore != null) {
1073       this.expiredMobFileCleanerChore.cancel(true);
1074     }
1075     if (this.mobCompactChore != null) {
1076       this.mobCompactChore.cancel(true);
1077     }
1078     if (this.balancerChore != null) {
1079       this.balancerChore.cancel(true);
1080     }
1081     if (this.normalizerChore != null) {
1082       this.normalizerChore.cancel(true);
1083     }
1084     if (this.clusterStatusChore != null) {
1085       this.clusterStatusChore.cancel(true);
1086     }
1087     if (this.catalogJanitorChore != null) {
1088       this.catalogJanitorChore.cancel(true);
1089     }
1090     if (this.clusterStatusPublisherChore != null){
1091       clusterStatusPublisherChore.cancel(true);
1092     }
1093     if (this.mobCompactThread != null) {
1094       this.mobCompactThread.close();
1095     }
1096
1097     if (this.periodicDoMetricsChore != null) {
1098       periodicDoMetricsChore.cancel();
1099     }
1100   }
1101
1102   /**
1103    * @return Get remote side's InetAddress
1104    */
1105   InetAddress getRemoteInetAddress(final int port,
1106       final long serverStartCode) throws UnknownHostException {
1107     // Do it out here in its own little method so can fake an address when
1108     // mocking up in tests.
1109     InetAddress ia = RpcServer.getRemoteIp();
1110
1111     // The call could be from the local regionserver,
1112     // in which case, there is no remote address.
1113     if (ia == null && serverStartCode == startcode) {
1114       InetSocketAddress isa = rpcServices.getSocketAddress();
1115       if (isa != null && isa.getPort() == port) {
1116         ia = isa.getAddress();
1117       }
1118     }
1119     return ia;
1120   }
1121
1122   /**
1123    * @return Maximum time we should run balancer for
1124    */
1125   private int getBalancerCutoffTime() {
1126     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1127     if (balancerCutoffTime == -1) {
1128       // if cutoff time isn't set, defaulting it to period time
1129       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1130       balancerCutoffTime = balancerPeriod;
1131     }
1132     return balancerCutoffTime;
1133   }
1134
1135   public boolean balance() throws IOException {
1136     return balance(false);
1137   }
1138
1139   public boolean balance(boolean force) throws IOException {
1140     // if master not initialized, don't run balancer.
1141     if (!isInitialized()) {
1142       LOG.debug("Master has not been initialized, don't run balancer.");
1143       return false;
1144     }
1145
1146     if (isInMaintenanceMode()) {
1147       LOG.info("Master is in maintenanceMode mode, don't run balancer.");
1148       return false;
1149     }
1150
1151     // Do this call outside of synchronized block.
1152     int maximumBalanceTime = getBalancerCutoffTime();
1153     synchronized (this.balancer) {
1154       // If balance not true, don't run balancer.
1155       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1156       // Only allow one balance run at at time.
1157       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1158         Set<RegionState> regionsInTransition =
1159           this.assignmentManager.getRegionStates().getRegionsInTransition();
1160         // if hbase:meta region is in transition, result of assignment cannot be recorded
1161         // ignore the force flag in that case
1162         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1163         String prefix = force && !metaInTransition ? "R" : "Not r";
1164         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1165           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1166             abbreviate(regionsInTransition.toString(), 256));
1167         if (!force || metaInTransition) return false;
1168       }
1169       if (this.serverManager.areDeadServersInProgress()) {
1170         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1171           this.serverManager.getDeadServers());
1172         return false;
1173       }
1174
1175       if (this.cpHost != null) {
1176         try {
1177           if (this.cpHost.preBalance()) {
1178             LOG.debug("Coprocessor bypassing balancer request");
1179             return false;
1180           }
1181         } catch (IOException ioe) {
1182           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1183           return false;
1184         }
1185       }
1186
1187       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1188         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1189
1190       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1191
1192       //Give the balancer the current cluster state.
1193       this.balancer.setClusterStatus(getClusterStatus());
1194       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1195         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1196         if (partialPlans != null) plans.addAll(partialPlans);
1197       }
1198
1199       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1200       int rpCount = 0;  // number of RegionPlans balanced so far
1201       long totalRegPlanExecTime = 0;
1202       if (plans != null && !plans.isEmpty()) {
1203         for (RegionPlan plan: plans) {
1204           LOG.info("balance " + plan);
1205           long balStartTime = System.currentTimeMillis();
1206           //TODO: bulk assign
1207           this.assignmentManager.balance(plan);
1208           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1209           rpCount++;
1210           if (rpCount < plans.size() &&
1211               // if performing next balance exceeds cutoff time, exit the loop
1212               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1213             //TODO: After balance, there should not be a cutoff time (keeping it as
1214             // a security net for now)
1215             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1216               maximumBalanceTime);
1217             break;
1218           }
1219         }
1220       }
1221       if (this.cpHost != null) {
1222         try {
1223           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1224         } catch (IOException ioe) {
1225           // balancing already succeeded so don't change the result
1226           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1227         }
1228       }
1229     }
1230     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1231     // Return true indicating a success.
1232     return true;
1233   }
1234
1235   @Override
1236   @VisibleForTesting
1237   public RegionNormalizer getRegionNormalizer() {
1238     return this.normalizer;
1239   }
1240
1241   /**
1242    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1243    *
1244    * @return true if normalization step was performed successfully, false otherwise
1245    *    (specifically, if HMaster hasn't been initialized properly or normalization
1246    *    is globally disabled)
1247    */
1248   public boolean normalizeRegions() throws IOException {
1249     if (!isInitialized()) {
1250       LOG.debug("Master has not been initialized, don't run region normalizer.");
1251       return false;
1252     }
1253
1254     if (isInMaintenanceMode()) {
1255       LOG.info("Master is in maintenance mode, don't run region normalizer.");
1256       return false;
1257     }
1258
1259     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1260       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1261       return false;
1262     }
1263
1264     synchronized (this.normalizer) {
1265       // Don't run the normalizer concurrently
1266       List<TableName> allEnabledTables = new ArrayList<>(
1267         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1268
1269       Collections.shuffle(allEnabledTables);
1270
1271       for (TableName table : allEnabledTables) {
1272         if (isInMaintenanceMode()) {
1273           LOG.debug("Master is in maintenance mode, stop running region normalizer.");
1274           return false;
1275         }
1276
1277         HTableDescriptor tblDesc = getTableDescriptors().get(table);
1278         if (table.isSystemTable() || (tblDesc != null &&
1279             !tblDesc.isNormalizationEnabled())) {
1280           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1281               + " table or doesn't have auto normalization turned on");
1282           continue;
1283         }
1284         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1285         if (plans != null) {
1286           for (NormalizationPlan plan : plans) {
1287             plan.execute(clusterConnection.getAdmin());
1288             if (plan.getType() == PlanType.SPLIT) {
1289               splitPlanCount++;
1290             } else if (plan.getType() == PlanType.MERGE) {
1291               mergePlanCount++;
1292             }
1293           }
1294         }
1295       }
1296     }
1297     // If Region did not generate any plans, it means the cluster is already balanced.
1298     // Return true indicating a success.
1299     return true;
1300   }
1301
1302   /**
1303    * @return Client info for use as prefix on an audit log string; who did an action
1304    */
1305   String getClientIdAuditPrefix() {
1306     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1307   }
1308
1309   /**
1310    * Switch for the background CatalogJanitor thread.
1311    * Used for testing.  The thread will continue to run.  It will just be a noop
1312    * if disabled.
1313    * @param b If false, the catalog janitor won't do anything.
1314    */
1315   public void setCatalogJanitorEnabled(final boolean b) {
1316     this.catalogJanitorChore.setEnabled(b);
1317   }
1318
1319   @Override
1320   public long dispatchMergingRegions(
1321       final HRegionInfo regionInfoA,
1322       final HRegionInfo regionInfoB,
1323       final boolean forcible,
1324       final long nonceGroup,
1325       final long nonce) throws IOException {
1326     checkInitialized();
1327
1328     TableName tableName = regionInfoA.getTable();
1329     if (tableName == null || regionInfoB.getTable() == null) {
1330       throw new UnknownRegionException ("Can't merge regions without table associated");
1331     }
1332
1333     if (!tableName.equals(regionInfoB.getTable())) {
1334       throw new IOException ("Cannot merge regions from two different tables");
1335     }
1336
1337     if (regionInfoA.compareTo(regionInfoB) == 0) {
1338       throw new MergeRegionException(
1339         "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB);
1340     }
1341
1342     HRegionInfo [] regionsToMerge = new HRegionInfo[2];
1343     regionsToMerge [0] = regionInfoA;
1344     regionsToMerge [1] = regionInfoB;
1345
1346     if (cpHost != null) {
1347       cpHost.preDispatchMerge(regionInfoA, regionInfoB);
1348     }
1349
1350     LOG.info(getClientIdAuditPrefix() + " Merge regions "
1351         + regionInfoA.getEncodedName() + " and " + regionInfoB.getEncodedName());
1352
1353     long procId = this.procedureExecutor.submitProcedure(
1354       new DispatchMergingRegionsProcedure(
1355         procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible),
1356       nonceGroup,
1357       nonce);
1358
1359     if (cpHost != null) {
1360       cpHost.postDispatchMerge(regionInfoA, regionInfoB);
1361     }
1362     return procId;
1363   }
1364
1365   void move(final byte[] encodedRegionName,
1366       final byte[] destServerName) throws HBaseIOException {
1367     RegionState regionState = assignmentManager.getRegionStates().
1368       getRegionState(Bytes.toString(encodedRegionName));
1369
1370     HRegionInfo hri;
1371     if (regionState != null) {
1372       hri = regionState.getRegion();
1373     } else {
1374       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1375     }
1376
1377     ServerName dest;
1378     if (destServerName == null || destServerName.length == 0) {
1379       LOG.info("Passed destination servername is null/empty so " +
1380         "choosing a server at random");
1381       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1382         regionState.getServerName());
1383       dest = balancer.randomAssignment(hri, destServers);
1384       if (dest == null) {
1385         LOG.debug("Unable to determine a plan to assign " + hri);
1386         return;
1387       }
1388     } else {
1389       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1390       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1391       if (dest == null) {
1392         LOG.debug("Unable to determine a plan to assign " + hri);
1393         return;
1394       }
1395       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1396           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1397         // To avoid unnecessary region moving later by balancer. Don't put user
1398         // regions on master. Regions on master could be put on other region
1399         // server intentionally by test however.
1400         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1401           + " to avoid unnecessary region moving later by load balancer,"
1402           + " because it should not be on master");
1403         return;
1404       }
1405     }
1406
1407     if (dest.equals(regionState.getServerName())) {
1408       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1409         + " because region already assigned to the same server " + dest + ".");
1410       return;
1411     }
1412
1413     // Now we can do the move
1414     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1415
1416     try {
1417       checkInitialized();
1418       if (this.cpHost != null) {
1419         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1420           return;
1421         }
1422       }
1423       // warmup the region on the destination before initiating the move. this call
1424       // is synchronous and takes some time. doing it before the source region gets
1425       // closed
1426       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1427
1428       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1429       this.assignmentManager.balance(rp);
1430       if (this.cpHost != null) {
1431         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1432       }
1433     } catch (IOException ioe) {
1434       if (ioe instanceof HBaseIOException) {
1435         throw (HBaseIOException)ioe;
1436       }
1437       throw new HBaseIOException(ioe);
1438     }
1439   }
1440
1441   @Override
1442   public long createTable(
1443       final HTableDescriptor hTableDescriptor,
1444       final byte [][] splitKeys,
1445       final long nonceGroup,
1446       final long nonce) throws IOException {
1447     if (isStopped()) {
1448       throw new MasterNotRunningException();
1449     }
1450     checkInitialized();
1451     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1452     this.clusterSchemaService.getNamespace(namespace);
1453
1454     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1455     checkInitialized();
1456     sanityCheckTableDescriptor(hTableDescriptor);
1457     if (cpHost != null) {
1458       cpHost.preCreateTable(hTableDescriptor, newRegions);
1459     }
1460     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1461
1462     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1463     //       TableExistsException by saying if the schema is the same or not.
1464     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1465     long procId = this.procedureExecutor.submitProcedure(
1466       new CreateTableProcedure(
1467         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1468       nonceGroup,
1469       nonce);
1470     latch.await();
1471
1472     if (cpHost != null) {
1473       cpHost.postCreateTable(hTableDescriptor, newRegions);
1474     }
1475
1476     return procId;
1477   }
1478
1479   @Override
1480   public long createSystemTable(final HTableDescriptor hTableDescriptor) throws IOException {
1481     if (isStopped()) {
1482       throw new MasterNotRunningException();
1483     }
1484
1485     TableName tableName = hTableDescriptor.getTableName();
1486     if (!(tableName.isSystemTable())) {
1487       throw new IllegalArgumentException(
1488         "Only system table creation can use this createSystemTable API");
1489     }
1490
1491     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, null);
1492
1493     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1494
1495     // This special create table is called locally to master.  Therefore, no RPC means no need
1496     // to use nonce to detect duplicated RPC call.
1497     long procId = this.procedureExecutor.submitProcedure(
1498       new CreateTableProcedure(procedureExecutor.getEnvironment(), hTableDescriptor, newRegions));
1499
1500     return procId;
1501   }
1502
1503   /**
1504    * Checks whether the table conforms to some sane limits, and configured
1505    * values (compression, etc) work. Throws an exception if something is wrong.
1506    * @throws IOException
1507    */
1508   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1509     final String CONF_KEY = "hbase.table.sanity.checks";
1510     boolean logWarn = false;
1511     if (!conf.getBoolean(CONF_KEY, true)) {
1512       logWarn = true;
1513     }
1514     String tableVal = htd.getConfigurationValue(CONF_KEY);
1515     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1516       logWarn = true;
1517     }
1518
1519     // check max file size
1520     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1521     long maxFileSize = htd.getMaxFileSize();
1522     if (maxFileSize < 0) {
1523       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1524     }
1525     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1526       String message = "MAX_FILESIZE for table descriptor or "
1527           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1528           + ") is too small, which might cause over splitting into unmanageable "
1529           + "number of regions.";
1530       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1531     }
1532
1533     // check flush size
1534     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1535     long flushSize = htd.getMemStoreFlushSize();
1536     if (flushSize < 0) {
1537       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1538     }
1539     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1540       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1541           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1542           + " very frequent flushing.";
1543       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1544     }
1545
1546     // check that coprocessors and other specified plugin classes can be loaded
1547     try {
1548       checkClassLoading(conf, htd);
1549     } catch (Exception ex) {
1550       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1551     }
1552
1553     // check compression can be loaded
1554     try {
1555       checkCompression(htd);
1556     } catch (IOException e) {
1557       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1558     }
1559
1560     // check encryption can be loaded
1561     try {
1562       checkEncryption(conf, htd);
1563     } catch (IOException e) {
1564       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1565     }
1566     // Verify compaction policy
1567     try{
1568       checkCompactionPolicy(conf, htd);
1569     } catch(IOException e){
1570       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1571     }
1572     // check that we have at least 1 CF
1573     if (htd.getColumnFamilies().length == 0) {
1574       String message = "Table should have at least one column family.";
1575       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1576     }
1577
1578     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1579       if (hcd.getTimeToLive() <= 0) {
1580         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1581         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1582       }
1583
1584       // check blockSize
1585       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1586         String message = "Block size for column family " + hcd.getNameAsString()
1587             + "  must be between 1K and 16MB.";
1588         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1589       }
1590
1591       // check versions
1592       if (hcd.getMinVersions() < 0) {
1593         String message = "Min versions for column family " + hcd.getNameAsString()
1594           + "  must be positive.";
1595         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1596       }
1597       // max versions already being checked
1598
1599       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1600       //  does not throw IllegalArgumentException
1601       // check minVersions <= maxVerions
1602       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1603         String message = "Min versions for column family " + hcd.getNameAsString()
1604             + " must be less than the Max versions.";
1605         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1606       }
1607
1608       // check replication scope
1609       checkReplicationScope(hcd);
1610
1611       // check data replication factor, it can be 0(default value) when user has not explicitly
1612       // set the value, in this case we use default replication factor set in the file system.
1613       if (hcd.getDFSReplication() < 0) {
1614         String message = "HFile Replication for column family " + hcd.getNameAsString()
1615             + "  must be greater than zero.";
1616         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1617       }
1618
1619       // TODO: should we check coprocessors and encryption ?
1620     }
1621   }
1622
1623   private void checkReplicationScope(HColumnDescriptor hcd) throws IOException{
1624     // check replication scope
1625     WALProtos.ScopeType scop = WALProtos.ScopeType.valueOf(hcd.getScope());
1626     if (scop == null) {
1627       String message = "Replication scope for column family "
1628           + hcd.getNameAsString() + " is " + hcd.getScope() + " which is invalid.";
1629
1630       LOG.error(message);
1631       throw new DoNotRetryIOException(message);
1632     }
1633   }
1634
1635   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1636       throws IOException {
1637     // FIFO compaction has some requirements
1638     // Actually FCP ignores periodic major compactions
1639     String className =
1640         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1641     if (className == null) {
1642       className =
1643           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1644             ExploringCompactionPolicy.class.getName());
1645     }
1646
1647     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1648     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1649     if (sv != null) {
1650       blockingFileCount = Integer.parseInt(sv);
1651     } else {
1652       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1653     }
1654
1655     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1656       String compactionPolicy =
1657           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1658       if (compactionPolicy == null) {
1659         compactionPolicy = className;
1660       }
1661       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1662         continue;
1663       }
1664       // FIFOCompaction
1665       String message = null;
1666
1667       // 1. Check TTL
1668       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1669         message = "Default TTL is not supported for FIFO compaction";
1670         throw new IOException(message);
1671       }
1672
1673       // 2. Check min versions
1674       if (hcd.getMinVersions() > 0) {
1675         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1676         throw new IOException(message);
1677       }
1678
1679       // 3. blocking file count
1680       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1681       if (sbfc != null) {
1682         blockingFileCount = Integer.parseInt(sbfc);
1683       }
1684       if (blockingFileCount < 1000) {
1685         message =
1686             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1687                 + " is below recommended minimum of 1000";
1688         throw new IOException(message);
1689       }
1690     }
1691   }
1692
1693   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1694   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1695       String message, Exception cause) throws IOException {
1696     if (!logWarn) {
1697       throw new DoNotRetryIOException(message + " Set " + confKey +
1698           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1699     }
1700     LOG.warn(message);
1701   }
1702
1703   private void startActiveMasterManager(int infoPort) throws KeeperException {
1704     String backupZNode = ZKUtil.joinZNode(
1705       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1706     /*
1707     * Add a ZNode for ourselves in the backup master directory since we
1708     * may not become the active master. If so, we want the actual active
1709     * master to know we are backup masters, so that it won't assign
1710     * regions to us if so configured.
1711     *
1712     * If we become the active master later, ActiveMasterManager will delete
1713     * this node explicitly.  If we crash before then, ZooKeeper will delete
1714     * this node for us since it is ephemeral.
1715     */
1716     LOG.info("Adding backup master ZNode " + backupZNode);
1717     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1718         serverName, infoPort)) {
1719       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1720     }
1721
1722     activeMasterManager.setInfoPort(infoPort);
1723     // Start a thread to try to become the active master, so we won't block here
1724     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1725       @Override
1726       public void run() {
1727         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1728           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1729         // If we're a backup master, stall until a primary to writes his address
1730         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1731           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1732           LOG.debug("HMaster started in backup mode. "
1733             + "Stalling until master znode is written.");
1734           // This will only be a minute or so while the cluster starts up,
1735           // so don't worry about setting watches on the parent znode
1736           while (!activeMasterManager.hasActiveMaster()) {
1737             LOG.debug("Waiting for master address ZNode to be written "
1738               + "(Also watching cluster state node)");
1739             Threads.sleep(timeout);
1740           }
1741         }
1742         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1743         status.setDescription("Master startup");
1744         try {
1745           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1746             finishActiveMasterInitialization(status);
1747           }
1748         } catch (Throwable t) {
1749           status.setStatus("Failed to become active: " + t.getMessage());
1750           LOG.fatal("Failed to become active master", t);
1751           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1752           if (t instanceof NoClassDefFoundError &&
1753             t.getMessage()
1754               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1755             // improved error message for this special case
1756             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1757               + "recompile HBase against Hadoop version "
1758               + org.apache.hadoop.util.VersionInfo.getVersion()
1759               + " or change your hadoop jars to start properly", t);
1760           } else {
1761             abort("Unhandled exception. Starting shutdown.", t);
1762           }
1763         } finally {
1764           status.cleanup();
1765         }
1766       }
1767     }, getServerName().toShortString() + ".activeMasterManager"));
1768   }
1769
1770   private void checkCompression(final HTableDescriptor htd)
1771   throws IOException {
1772     if (!this.masterCheckCompression) return;
1773     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1774       checkCompression(hcd);
1775     }
1776   }
1777
1778   private void checkCompression(final HColumnDescriptor hcd)
1779   throws IOException {
1780     if (!this.masterCheckCompression) return;
1781     CompressionTest.testCompression(hcd.getCompressionType());
1782     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1783   }
1784
1785   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1786   throws IOException {
1787     if (!this.masterCheckEncryption) return;
1788     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1789       checkEncryption(conf, hcd);
1790     }
1791   }
1792
1793   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1794   throws IOException {
1795     if (!this.masterCheckEncryption) return;
1796     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1797   }
1798
1799   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1800   throws IOException {
1801     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1802     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1803   }
1804
1805   private static boolean isCatalogTable(final TableName tableName) {
1806     return tableName.equals(TableName.META_TABLE_NAME);
1807   }
1808
1809   @Override
1810   public long deleteTable(
1811       final TableName tableName,
1812       final long nonceGroup,
1813       final long nonce) throws IOException {
1814     checkInitialized();
1815     if (cpHost != null) {
1816       cpHost.preDeleteTable(tableName);
1817     }
1818     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1819
1820     // TODO: We can handle/merge duplicate request
1821     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1822     long procId = this.procedureExecutor.submitProcedure(
1823       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1824       nonceGroup,
1825       nonce);
1826     latch.await();
1827
1828     if (cpHost != null) {
1829       cpHost.postDeleteTable(tableName);
1830     }
1831
1832     return procId;
1833   }
1834
1835   @Override
1836   public long truncateTable(
1837       final TableName tableName,
1838       final boolean preserveSplits,
1839       final long nonceGroup,
1840       final long nonce) throws IOException {
1841     checkInitialized();
1842     if (cpHost != null) {
1843       cpHost.preTruncateTable(tableName);
1844     }
1845     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1846
1847     long procId = this.procedureExecutor.submitProcedure(
1848       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1849       nonceGroup,
1850       nonce);
1851     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1852
1853     if (cpHost != null) {
1854       cpHost.postTruncateTable(tableName);
1855     }
1856     return procId;
1857   }
1858
1859   @Override
1860   public long addColumn(
1861       final TableName tableName,
1862       final HColumnDescriptor columnDescriptor,
1863       final long nonceGroup,
1864       final long nonce)
1865       throws IOException {
1866     checkInitialized();
1867     checkCompression(columnDescriptor);
1868     checkEncryption(conf, columnDescriptor);
1869     checkReplicationScope(columnDescriptor);
1870     if (cpHost != null) {
1871       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1872         return -1;
1873       }
1874     }
1875     // Execute the operation synchronously - wait for the operation to complete before continuing.
1876     long procId = this.procedureExecutor.submitProcedure(
1877       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1878       nonceGroup,
1879       nonce);
1880     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1881     if (cpHost != null) {
1882       cpHost.postAddColumn(tableName, columnDescriptor);
1883     }
1884     return procId;
1885   }
1886
1887   @Override
1888   public long modifyColumn(
1889       final TableName tableName,
1890       final HColumnDescriptor descriptor,
1891       final long nonceGroup,
1892       final long nonce)
1893       throws IOException {
1894     checkInitialized();
1895     checkCompression(descriptor);
1896     checkEncryption(conf, descriptor);
1897     checkReplicationScope(descriptor);
1898     if (cpHost != null) {
1899       if (cpHost.preModifyColumn(tableName, descriptor)) {
1900         return -1;
1901       }
1902     }
1903     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1904
1905     // Execute the operation synchronously - wait for the operation to complete before continuing.
1906     long procId = this.procedureExecutor.submitProcedure(
1907       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1908       nonceGroup,
1909       nonce);
1910     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1911
1912     if (cpHost != null) {
1913       cpHost.postModifyColumn(tableName, descriptor);
1914     }
1915     return procId;
1916   }
1917
1918   @Override
1919   public long deleteColumn(
1920       final TableName tableName,
1921       final byte[] columnName,
1922       final long nonceGroup,
1923       final long nonce)
1924       throws IOException {
1925     checkInitialized();
1926     if (cpHost != null) {
1927       if (cpHost.preDeleteColumn(tableName, columnName)) {
1928         return -1;
1929       }
1930     }
1931     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1932
1933     // Execute the operation synchronously - wait for the operation to complete before continuing.
1934     long procId = this.procedureExecutor.submitProcedure(
1935       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1936       nonceGroup,
1937       nonce);
1938     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1939
1940     if (cpHost != null) {
1941       cpHost.postDeleteColumn(tableName, columnName);
1942     }
1943     return procId;
1944   }
1945
1946   @Override
1947   public long enableTable(
1948       final TableName tableName,
1949       final long nonceGroup,
1950       final long nonce) throws IOException {
1951     checkInitialized();
1952     if (cpHost != null) {
1953       cpHost.preEnableTable(tableName);
1954     }
1955     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1956
1957     // Execute the operation asynchronously - client will check the progress of the operation
1958     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1959     long procId = this.procedureExecutor.submitProcedure(
1960       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1961       nonceGroup,
1962       nonce);
1963     // Before returning to client, we want to make sure that the table is prepared to be
1964     // enabled (the table is locked and the table state is set).
1965     //
1966     // Note: if the procedure throws exception, we will catch it and rethrow.
1967     prepareLatch.await();
1968
1969     if (cpHost != null) {
1970       cpHost.postEnableTable(tableName);
1971     }
1972
1973     return procId;
1974   }
1975
1976   @Override
1977   public long disableTable(
1978       final TableName tableName,
1979       final long nonceGroup,
1980       final long nonce) throws IOException {
1981     checkInitialized();
1982     if (cpHost != null) {
1983       cpHost.preDisableTable(tableName);
1984     }
1985     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1986
1987     // Execute the operation asynchronously - client will check the progress of the operation
1988     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1989     // Execute the operation asynchronously - client will check the progress of the operation
1990     long procId = this.procedureExecutor.submitProcedure(
1991       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1992       nonceGroup,
1993       nonce);
1994     // Before returning to client, we want to make sure that the table is prepared to be
1995     // enabled (the table is locked and the table state is set).
1996     //
1997     // Note: if the procedure throws exception, we will catch it and rethrow.
1998     prepareLatch.await();
1999
2000     if (cpHost != null) {
2001       cpHost.postDisableTable(tableName);
2002     }
2003
2004     return procId;
2005   }
2006
2007   /**
2008    * Return the region and current deployment for the region containing
2009    * the given row. If the region cannot be found, returns null. If it
2010    * is found, but not currently deployed, the second element of the pair
2011    * may be null.
2012    */
2013   @VisibleForTesting // Used by TestMaster.
2014   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2015       final TableName tableName, final byte [] rowKey)
2016   throws IOException {
2017     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2018       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2019
2020     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2021         @Override
2022         public boolean visit(Result data) throws IOException {
2023           if (data == null || data.size() <= 0) {
2024             return true;
2025           }
2026           Pair<HRegionInfo, ServerName> pair =
2027               new Pair(MetaTableAccessor.getHRegionInfo(data),
2028                   MetaTableAccessor.getServerName(data,0));
2029           if (pair == null) {
2030             return false;
2031           }
2032           if (!pair.getFirst().getTable().equals(tableName)) {
2033             return false;
2034           }
2035           result.set(pair);
2036           return true;
2037         }
2038     };
2039
2040     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2041     return result.get();
2042   }
2043
2044   @Override
2045   public long modifyTable(
2046       final TableName tableName,
2047       final HTableDescriptor descriptor,
2048       final long nonceGroup,
2049       final long nonce)
2050       throws IOException {
2051     checkInitialized();
2052     sanityCheckTableDescriptor(descriptor);
2053     if (cpHost != null) {
2054       cpHost.preModifyTable(tableName, descriptor);
2055     }
2056
2057     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2058
2059     // Execute the operation synchronously - wait for the operation completes before continuing.
2060     long procId = this.procedureExecutor.submitProcedure(
2061       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2062       nonceGroup,
2063       nonce);
2064
2065     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2066
2067     if (cpHost != null) {
2068       cpHost.postModifyTable(tableName, descriptor);
2069     }
2070
2071     return procId;
2072   }
2073
2074   @Override
2075   public void checkTableModifiable(final TableName tableName)
2076       throws IOException, TableNotFoundException, TableNotDisabledException {
2077     if (isCatalogTable(tableName)) {
2078       throw new IOException("Can't modify catalog tables");
2079     }
2080     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2081       throw new TableNotFoundException(tableName);
2082     }
2083     if (!getTableStateManager().isTableState(tableName, TableState.State.DISABLED)) {
2084       throw new TableNotDisabledException(tableName);
2085     }
2086   }
2087
2088   /**
2089    * @return cluster status
2090    */
2091   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2092     // Build Set of backup masters from ZK nodes
2093     List<String> backupMasterStrings;
2094     try {
2095       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2096         this.zooKeeper.backupMasterAddressesZNode);
2097     } catch (KeeperException e) {
2098       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2099       backupMasterStrings = null;
2100     }
2101
2102     List<ServerName> backupMasters = null;
2103     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2104       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2105       for (String s: backupMasterStrings) {
2106         try {
2107           byte [] bytes;
2108           try {
2109             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2110                 this.zooKeeper.backupMasterAddressesZNode, s));
2111           } catch (InterruptedException e) {
2112             throw new InterruptedIOException();
2113           }
2114           if (bytes != null) {
2115             ServerName sn;
2116             try {
2117               sn = ServerName.parseFrom(bytes);
2118             } catch (DeserializationException e) {
2119               LOG.warn("Failed parse, skipping registering backup server", e);
2120               continue;
2121             }
2122             backupMasters.add(sn);
2123           }
2124         } catch (KeeperException e) {
2125           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2126                    "backup servers"), e);
2127         }
2128       }
2129       Collections.sort(backupMasters, new Comparator<ServerName>() {
2130         @Override
2131         public int compare(ServerName s1, ServerName s2) {
2132           return s1.getServerName().compareTo(s2.getServerName());
2133         }});
2134     }
2135
2136     String clusterId = fileSystemManager != null ?
2137       fileSystemManager.getClusterId().toString() : null;
2138     Set<RegionState> regionsInTransition = assignmentManager != null ?
2139       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2140     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2141     boolean balancerOn = loadBalancerTracker != null ?
2142       loadBalancerTracker.isBalancerOn() : false;
2143     Map<ServerName, ServerLoad> onlineServers = null;
2144     Set<ServerName> deadServers = null;
2145     if (serverManager != null) {
2146       deadServers = serverManager.getDeadServers().copyServerNames();
2147       onlineServers = serverManager.getOnlineServers();
2148     }
2149     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2150       onlineServers, deadServers, serverName, backupMasters,
2151       regionsInTransition, coprocessors, balancerOn);
2152   }
2153
2154   /**
2155    * The set of loaded coprocessors is stored in a static set. Since it's
2156    * statically allocated, it does not require that HMaster's cpHost be
2157    * initialized prior to accessing it.
2158    * @return a String representation of the set of names of the loaded coprocessors.
2159    */
2160   public static String getLoadedCoprocessors() {
2161     return CoprocessorHost.getLoadedCoprocessors().toString();
2162   }
2163
2164   /**
2165    * @return timestamp in millis when HMaster was started.
2166    */
2167   public long getMasterStartTime() {
2168     return startcode;
2169   }
2170
2171   /**
2172    * @return timestamp in millis when HMaster became the active master.
2173    */
2174   public long getMasterActiveTime() {
2175     return masterActiveTime;
2176   }
2177
2178   public int getNumWALFiles() {
2179     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2180   }
2181
2182   public WALProcedureStore getWalProcedureStore() {
2183     return procedureStore;
2184   }
2185
2186   public int getRegionServerInfoPort(final ServerName sn) {
2187     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2188     if (info == null || info.getInfoPort() == 0) {
2189       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2190         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2191     }
2192     return info.getInfoPort();
2193   }
2194
2195   public String getRegionServerVersion(final ServerName sn) {
2196     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2197     if (info != null && info.hasVersionInfo()) {
2198       return info.getVersionInfo().getVersion();
2199     }
2200     return "Unknown";
2201   }
2202
2203   /**
2204    * @return array of coprocessor SimpleNames.
2205    */
2206   public String[] getMasterCoprocessors() {
2207     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2208     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2209   }
2210
2211   @Override
2212   public void abort(final String msg, final Throwable t) {
2213     if (isAborted() || isStopped()) {
2214       return;
2215     }
2216     if (cpHost != null) {
2217       // HBASE-4014: dump a list of loaded coprocessors.
2218       LOG.fatal("Master server abort: loaded coprocessors are: " +
2219           getLoadedCoprocessors());
2220     }
2221     if (t != null) LOG.fatal(msg, t);
2222     try {
2223       stopMaster();
2224     } catch (IOException e) {
2225       LOG.error("Exception occurred while stopping master", e);
2226     }
2227   }
2228
2229   @Override
2230   public ZooKeeperWatcher getZooKeeper() {
2231     return zooKeeper;
2232   }
2233
2234   @Override
2235   public MasterCoprocessorHost getMasterCoprocessorHost() {
2236     return cpHost;
2237   }
2238
2239   @Override
2240   public MasterQuotaManager getMasterQuotaManager() {
2241     return quotaManager;
2242   }
2243
2244   @Override
2245   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2246     return procedureExecutor;
2247   }
2248
2249   @Override
2250   public ServerName getServerName() {
2251     return this.serverName;
2252   }
2253
2254   @Override
2255   public AssignmentManager getAssignmentManager() {
2256     return this.assignmentManager;
2257   }
2258
2259   @Override
2260   public CatalogJanitor getCatalogJanitor() {
2261     return this.catalogJanitorChore;
2262   }
2263
2264   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2265     return rsFatals;
2266   }
2267
2268   public void shutdown() throws IOException {
2269     if (cpHost != null) {
2270       cpHost.preShutdown();
2271     }
2272
2273     if (this.serverManager != null) {
2274       this.serverManager.shutdownCluster();
2275     }
2276     if (this.clusterStatusTracker != null){
2277       try {
2278         this.clusterStatusTracker.setClusterDown();
2279       } catch (KeeperException e) {
2280         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2281       }
2282     }
2283   }
2284
2285   public void stopMaster() throws IOException {
2286     if (cpHost != null) {
2287       cpHost.preStopMaster();
2288     }
2289     stop("Stopped by " + Thread.currentThread().getName());
2290   }
2291
2292   void checkServiceStarted() throws ServerNotRunningYetException {
2293     if (!serviceStarted) {
2294       throw new ServerNotRunningYetException("Server is not running yet");
2295     }
2296   }
2297
2298   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2299     checkServiceStarted();
2300     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2301   }
2302
2303   /**
2304    * Report whether this master is currently the active master or not.
2305    * If not active master, we are parked on ZK waiting to become active.
2306    *
2307    * This method is used for testing.
2308    *
2309    * @return true if active master, false if not.
2310    */
2311   @Override
2312   public boolean isActiveMaster() {
2313     return isActiveMaster;
2314   }
2315
2316   /**
2317    * Report whether this master has completed with its initialization and is
2318    * ready.  If ready, the master is also the active master.  A standby master
2319    * is never ready.
2320    *
2321    * This method is used for testing.
2322    *
2323    * @return true if master is ready to go, false if not.
2324    */
2325   @Override
2326   public boolean isInitialized() {
2327     return initialized.isReady();
2328   }
2329
2330   /**
2331    * Report whether this master is in maintenance mode.
2332    *
2333    * @return true if master is in maintenanceMode
2334    */
2335   @Override
2336   public boolean isInMaintenanceMode() {
2337     return maintenanceModeTracker.isInMaintenanceMode();
2338   }
2339
2340   @VisibleForTesting
2341   public void setInitialized(boolean isInitialized) {
2342     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2343   }
2344
2345   public ProcedureEvent getInitializedEvent() {
2346     return initialized;
2347   }
2348
2349   /**
2350    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2351    * of crashed servers.
2352    * @return true if assignMeta has completed;
2353    */
2354   @Override
2355   public boolean isServerCrashProcessingEnabled() {
2356     return serverCrashProcessingEnabled.isReady();
2357   }
2358
2359   @VisibleForTesting
2360   public void setServerCrashProcessingEnabled(final boolean b) {
2361     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2362   }
2363
2364   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2365     return serverCrashProcessingEnabled;
2366   }
2367
2368   /**
2369    * Report whether this master has started initialization and is about to do meta region assignment
2370    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2371    */
2372   public boolean isInitializationStartsMetaRegionAssignment() {
2373     return this.initializationBeforeMetaAssignment;
2374   }
2375
2376   /**
2377    * Compute the average load across all region servers.
2378    * Currently, this uses a very naive computation - just uses the number of
2379    * regions being served, ignoring stats about number of requests.
2380    * @return the average load
2381    */
2382   public double getAverageLoad() {
2383     if (this.assignmentManager == null) {
2384       return 0;
2385     }
2386
2387     RegionStates regionStates = this.assignmentManager.getRegionStates();
2388     if (regionStates == null) {
2389       return 0;
2390     }
2391     return regionStates.getAverageLoad();
2392   }
2393
2394   /*
2395    * @return the count of region split plans executed
2396    */
2397   public long getSplitPlanCount() {
2398     return splitPlanCount;
2399   }
2400
2401   /*
2402    * @return the count of region merge plans executed
2403    */
2404   public long getMergePlanCount() {
2405     return mergePlanCount;
2406   }
2407
2408   @Override
2409   public boolean registerService(Service instance) {
2410     /*
2411      * No stacking of instances is allowed for a single service name
2412      */
2413     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2414     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2415     if (coprocessorServiceHandlers.containsKey(serviceName)) {
2416       LOG.error("Coprocessor service "+serviceName+
2417           " already registered, rejecting request from "+instance
2418       );
2419       return false;
2420     }
2421
2422     coprocessorServiceHandlers.put(serviceName, instance);
2423     if (LOG.isDebugEnabled()) {
2424       LOG.debug("Registered master coprocessor service: service="+serviceName);
2425     }
2426     return true;
2427   }
2428
2429   /**
2430    * Utility for constructing an instance of the passed HMaster class.
2431    * @param masterClass
2432    * @return HMaster instance.
2433    */
2434   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2435       final Configuration conf, final CoordinatedStateManager cp)  {
2436     try {
2437       Constructor<? extends HMaster> c =
2438         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2439       return c.newInstance(conf, cp);
2440     } catch(Exception e) {
2441       Throwable error = e;
2442       if (e instanceof InvocationTargetException &&
2443           ((InvocationTargetException)e).getTargetException() != null) {
2444         error = ((InvocationTargetException)e).getTargetException();
2445       }
2446       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2447         , error);
2448     }
2449   }
2450
2451   /**
2452    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2453    */
2454   public static void main(String [] args) {
2455     VersionInfo.logVersion();
2456     new HMasterCommandLine(HMaster.class).doMain(args);
2457   }
2458
2459   public HFileCleaner getHFileCleaner() {
2460     return this.hfileCleaner;
2461   }
2462
2463   /**
2464    * @return the underlying snapshot manager
2465    */
2466   @Override
2467   public SnapshotManager getSnapshotManager() {
2468     return this.snapshotManager;
2469   }
2470
2471   /**
2472    * @return the underlying MasterProcedureManagerHost
2473    */
2474   @Override
2475   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2476     return mpmHost;
2477   }
2478
2479   @Override
2480   public ClusterSchema getClusterSchema() {
2481     return this.clusterSchemaService;
2482   }
2483
2484   /**
2485    * Create a new Namespace.
2486    * @param namespaceDescriptor descriptor for new Namespace
2487    * @param nonceGroup Identifier for the source of the request, a client or process.
2488    * @param nonce A unique identifier for this operation from the client or process identified by
2489    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2490    * @return procedure id
2491    */
2492   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2493       final long nonce)
2494   throws IOException {
2495     checkInitialized();
2496     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2497     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2498       throw new BypassCoprocessorException();
2499     }
2500     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2501     // Execute the operation synchronously - wait for the operation to complete before continuing.
2502     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2503     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2504     return procId;
2505   }
2506
2507   /**
2508    * Modify an existing Namespace.
2509    * @param nonceGroup Identifier for the source of the request, a client or process.
2510    * @param nonce A unique identifier for this operation from the client or process identified by
2511    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2512    * @return procedure id
2513    */
2514   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2515       final long nonce)
2516   throws IOException {
2517     checkInitialized();
2518     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2519     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2520       throw new BypassCoprocessorException();
2521     }
2522     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2523     // Execute the operation synchronously - wait for the operation to complete before continuing.
2524     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2525     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2526     return procId;
2527   }
2528
2529   /**
2530    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2531    * @param nonceGroup Identifier for the source of the request, a client or process.
2532    * @param nonce A unique identifier for this operation from the client or process identified by
2533    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2534    * @return procedure id
2535    */
2536   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2537   throws IOException {
2538     checkInitialized();
2539     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2540       throw new BypassCoprocessorException();
2541     }
2542     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2543     // Execute the operation synchronously - wait for the operation to complete before continuing.
2544     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2545     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2546     return procId;
2547   }
2548
2549   /**
2550    * Get a Namespace
2551    * @param name Name of the Namespace
2552    * @return Namespace descriptor for <code>name</code>
2553    */
2554   NamespaceDescriptor getNamespace(String name) throws IOException {
2555     checkInitialized();
2556     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2557     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2558     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2559     return nsd;
2560   }
2561
2562   /**
2563    * Get all Namespaces
2564    * @return All Namespace descriptors
2565    */
2566   List<NamespaceDescriptor> getNamespaces() throws IOException {
2567     checkInitialized();
2568     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2569     boolean bypass = false;
2570     if (cpHost != null) {
2571       bypass = cpHost.preListNamespaceDescriptors(nsds);
2572     }
2573     if (!bypass) {
2574       nsds.addAll(this.clusterSchemaService.getNamespaces());
2575       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2576     }
2577     return nsds;
2578   }
2579
2580   @Override
2581   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2582     checkInitialized();
2583     return listTableNames(name, null, true);
2584   }
2585
2586   @Override
2587   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2588     checkInitialized();
2589     return listTableDescriptors(name, null, null, true);
2590   }
2591
2592   @Override
2593   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2594       throws IOException {
2595     if (cpHost != null) {
2596       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2597     }
2598
2599     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2600
2601     if (cpHost != null) {
2602       cpHost.postAbortProcedure();
2603     }
2604
2605     return result;
2606   }
2607
2608   @Override
2609   public List<ProcedureInfo> listProcedures() throws IOException {
2610     if (cpHost != null) {
2611       cpHost.preListProcedures();
2612     }
2613
2614     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2615
2616     if (cpHost != null) {
2617       cpHost.postListProcedures(procInfoList);
2618     }
2619
2620     return procInfoList;
2621   }
2622
2623   /**
2624    * Returns the list of table descriptors that match the specified request
2625    * @param namespace the namespace to query, or null if querying for all
2626    * @param regex The regular expression to match against, or null if querying for all
2627    * @param tableNameList the list of table names, or null if querying for all
2628    * @param includeSysTables False to match only against userspace tables
2629    * @return the list of table descriptors
2630    */
2631   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2632       final List<TableName> tableNameList, final boolean includeSysTables)
2633   throws IOException {
2634     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2635     boolean bypass = cpHost != null?
2636         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2637     if (!bypass) {
2638       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2639       if (cpHost != null) {
2640         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2641       }
2642     }
2643     return htds;
2644   }
2645
2646   /**
2647    * Returns the list of table names that match the specified request
2648    * @param regex The regular expression to match against, or null if querying for all
2649    * @param namespace the namespace to query, or null if querying for all
2650    * @param includeSysTables False to match only against userspace tables
2651    * @return the list of table names
2652    */
2653   public List<TableName> listTableNames(final String namespace, final String regex,
2654       final boolean includeSysTables) throws IOException {
2655     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2656     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2657     if (!bypass) {
2658       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2659       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2660     }
2661     List<TableName> result = new ArrayList<TableName>(htds.size());
2662     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2663     return result;
2664   }
2665
2666   /**
2667    * @return list of table table descriptors after filtering by regex and whether to include system
2668    *    tables, etc.
2669    * @throws IOException
2670    */
2671   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2672       final String namespace, final String regex, final List<TableName> tableNameList,
2673       final boolean includeSysTables)
2674   throws IOException {
2675     if (tableNameList == null || tableNameList.size() == 0) {
2676       // request for all TableDescriptors
2677       Collection<HTableDescriptor> allHtds;
2678       if (namespace != null && namespace.length() > 0) {
2679         // Do a check on the namespace existence. Will fail if does not exist.
2680         this.clusterSchemaService.getNamespace(namespace);
2681         allHtds = tableDescriptors.getByNamespace(namespace).values();
2682       } else {
2683         allHtds = tableDescriptors.getAll().values();
2684       }
2685       for (HTableDescriptor desc: allHtds) {
2686         if (tableStateManager.isTablePresent(desc.getTableName())
2687             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2688           htds.add(desc);
2689         }
2690       }
2691     } else {
2692       for (TableName s: tableNameList) {
2693         if (tableStateManager.isTablePresent(s)) {
2694           HTableDescriptor desc = tableDescriptors.get(s);
2695           if (desc != null) {
2696             htds.add(desc);
2697           }
2698         }
2699       }
2700     }
2701
2702     // Retains only those matched by regular expression.
2703     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2704     return htds;
2705   }
2706
2707   /**
2708    * Removes the table descriptors that don't match the pattern.
2709    * @param descriptors list of table descriptors to filter
2710    * @param pattern the regex to use
2711    */
2712   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2713       final Pattern pattern) {
2714     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2715     Iterator<HTableDescriptor> itr = descriptors.iterator();
2716     while (itr.hasNext()) {
2717       HTableDescriptor htd = itr.next();
2718       String tableName = htd.getTableName().getNameAsString();
2719       boolean matched = pattern.matcher(tableName).matches();
2720       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2721         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2722       }
2723       if (!matched) {
2724         itr.remove();
2725       }
2726     }
2727   }
2728
2729   @Override
2730   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2731     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2732   }
2733
2734   @Override
2735   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2736     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2737   }
2738
2739   /**
2740    * Gets the mob file compaction state for a specific table.
2741    * Whether all the mob files are selected is known during the compaction execution, but
2742    * the statistic is done just before compaction starts, it is hard to know the compaction
2743    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2744    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2745    * @param tableName The current table name.
2746    * @return If a given table is in mob file compaction now.
2747    */
2748   public CompactionState getMobCompactionState(TableName tableName) {
2749     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2750     if (compactionsCount != null && compactionsCount.get() != 0) {
2751       return CompactionState.MAJOR_AND_MINOR;
2752     }
2753     return CompactionState.NONE;
2754   }
2755
2756   public void reportMobCompactionStart(TableName tableName) throws IOException {
2757     IdLock.Entry lockEntry = null;
2758     try {
2759       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2760       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2761       if (compactionsCount == null) {
2762         compactionsCount = new AtomicInteger(0);
2763         mobCompactionStates.put(tableName, compactionsCount);
2764       }
2765       compactionsCount.incrementAndGet();
2766     } finally {
2767       if (lockEntry != null) {
2768         mobCompactionLock.releaseLockEntry(lockEntry);
2769       }
2770     }
2771   }
2772
2773   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2774     IdLock.Entry lockEntry = null;
2775     try {
2776       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2777       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2778       if (compactionsCount != null) {
2779         int count = compactionsCount.decrementAndGet();
2780         // remove the entry if the count is 0.
2781         if (count == 0) {
2782           mobCompactionStates.remove(tableName);
2783         }
2784       }
2785     } finally {
2786       if (lockEntry != null) {
2787         mobCompactionLock.releaseLockEntry(lockEntry);
2788       }
2789     }
2790   }
2791
2792   /**
2793    * Requests mob compaction.
2794    * @param tableName The table the compact.
2795    * @param columns The compacted columns.
2796    * @param allFiles Whether add all mob files into the compaction.
2797    */
2798   public void requestMobCompaction(TableName tableName,
2799     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2800     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2801       tableLockManager, allFiles);
2802   }
2803
2804   /**
2805    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2806    * false is returned.
2807    *
2808    * @return The state of the load balancer, or false if the load balancer isn't defined.
2809    */
2810   public boolean isBalancerOn() {
2811     if (null == loadBalancerTracker || isInMaintenanceMode()) {
2812       return false;
2813     }
2814     return loadBalancerTracker.isBalancerOn();
2815   }
2816
2817   /**
2818    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2819    * false is returned.
2820    */
2821   public boolean isNormalizerOn() {
2822     return (null == regionNormalizerTracker || isInMaintenanceMode()) ?
2823         false: regionNormalizerTracker.isNormalizerOn();
2824   }
2825
2826   /**
2827    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
2828    * false is returned. If switchType is illegal, false will return.
2829    * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
2830    * @return The state of the switch
2831    */
2832   public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
2833     if (null == splitOrMergeTracker || isInMaintenanceMode()) {
2834       return false;
2835     }
2836     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
2837   }
2838
2839   /**
2840    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2841    *
2842    * @return The name of the {@link LoadBalancer} in use.
2843    */
2844   public String getLoadBalancerClassName() {
2845     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2846         .getDefaultLoadBalancerClass().getName());
2847   }
2848
2849   /**
2850    * @return RegionNormalizerTracker instance
2851    */
2852   public RegionNormalizerTracker getRegionNormalizerTracker() {
2853     return regionNormalizerTracker;
2854   }
2855
2856   public SplitOrMergeTracker getSplitOrMergeTracker() {
2857     return splitOrMergeTracker;
2858   }
2859
2860   @Override
2861   public LoadBalancer getLoadBalancer() {
2862     return balancer;
2863   }
2864 }