View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Descriptors;
25  import com.google.protobuf.Service;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.lang.reflect.Constructor;
30  import java.lang.reflect.InvocationTargetException;
31  import java.net.InetAddress;
32  import java.net.InetSocketAddress;
33  import java.net.UnknownHostException;
34  import java.util.ArrayList;
35  import java.util.Collection;
36  import java.util.Collections;
37  import java.util.Comparator;
38  import java.util.HashMap;
39  import java.util.Iterator;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.Map.Entry;
43  import java.util.Set;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.atomic.AtomicReference;
48  import java.util.regex.Pattern;
49
50  import javax.servlet.ServletException;
51  import javax.servlet.http.HttpServlet;
52  import javax.servlet.http.HttpServletRequest;
53  import javax.servlet.http.HttpServletResponse;
54
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.hbase.ClusterStatus;
60  import org.apache.hadoop.hbase.CoordinatedStateException;
61  import org.apache.hadoop.hbase.CoordinatedStateManager;
62  import org.apache.hadoop.hbase.DoNotRetryIOException;
63  import org.apache.hadoop.hbase.HBaseIOException;
64  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
65  import org.apache.hadoop.hbase.HColumnDescriptor;
66  import org.apache.hadoop.hbase.HConstants;
67  import org.apache.hadoop.hbase.HRegionInfo;
68  import org.apache.hadoop.hbase.HTableDescriptor;
69  import org.apache.hadoop.hbase.MasterNotRunningException;
70  import org.apache.hadoop.hbase.MetaTableAccessor;
71  import org.apache.hadoop.hbase.NamespaceDescriptor;
72  import org.apache.hadoop.hbase.PleaseHoldException;
73  import org.apache.hadoop.hbase.ProcedureInfo;
74  import org.apache.hadoop.hbase.RegionStateListener;
75  import org.apache.hadoop.hbase.ScheduledChore;
76  import org.apache.hadoop.hbase.ServerLoad;
77  import org.apache.hadoop.hbase.ServerName;
78  import org.apache.hadoop.hbase.TableDescriptors;
79  import org.apache.hadoop.hbase.TableName;
80  import org.apache.hadoop.hbase.TableNotDisabledException;
81  import org.apache.hadoop.hbase.TableNotFoundException;
82  import org.apache.hadoop.hbase.UnknownRegionException;
83  import org.apache.hadoop.hbase.classification.InterfaceAudience;
84  import org.apache.hadoop.hbase.client.MasterSwitchType;
85  import org.apache.hadoop.hbase.client.Result;
86  import org.apache.hadoop.hbase.client.TableState;
87  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
88  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
89  import org.apache.hadoop.hbase.exceptions.DeserializationException;
90  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
91  import org.apache.hadoop.hbase.executor.ExecutorType;
92  import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
93  import org.apache.hadoop.hbase.ipc.RpcServer;
94  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
95  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
96  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
97  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
98  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
99  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
100 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
101 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
102 import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
103 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
104 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
105 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
106 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
107 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
108 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
109 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
110 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
111 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
112 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
113 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
115 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
116 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
117 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
118 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
119 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
120 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
121 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
122 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
123 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
124 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
125 import org.apache.hadoop.hbase.mob.MobConstants;
126 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
127 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
128 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
129 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
130 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
131 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
132 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
134 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
135 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
136 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
137 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
138 import org.apache.hadoop.hbase.regionserver.HRegionServer;
139 import org.apache.hadoop.hbase.regionserver.HStore;
140 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
141 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
142 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
143 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
144 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
145 import org.apache.hadoop.hbase.replication.ReplicationFactory;
146 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
147 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
148 import org.apache.hadoop.hbase.replication.regionserver.Replication;
149 import org.apache.hadoop.hbase.security.UserProvider;
150 import org.apache.hadoop.hbase.util.Addressing;
151 import org.apache.hadoop.hbase.util.Bytes;
152 import org.apache.hadoop.hbase.util.CompressionTest;
153 import org.apache.hadoop.hbase.util.EncryptionTest;
154 import org.apache.hadoop.hbase.util.FSUtils;
155 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
156 import org.apache.hadoop.hbase.util.HasThread;
157 import org.apache.hadoop.hbase.util.IdLock;
158 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
159 import org.apache.hadoop.hbase.util.Pair;
160 import org.apache.hadoop.hbase.util.Threads;
161 import org.apache.hadoop.hbase.util.VersionInfo;
162 import org.apache.hadoop.hbase.util.ZKDataMigrator;
163 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
164 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
165 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
166 import org.apache.hadoop.hbase.zookeeper.MasterMaintenanceModeTracker;
167 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
168 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
169 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
170 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
171 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
172 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
173 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
174 import org.apache.zookeeper.KeeperException;
175 import org.mortbay.jetty.Connector;
176 import org.mortbay.jetty.nio.SelectChannelConnector;
177 import org.mortbay.jetty.servlet.Context;
178
179 /**
180  * HMaster is the "master server" for HBase. An HBase cluster has one active
181  * master.  If many masters are started, all compete.  Whichever wins goes on to
182  * run the cluster.  All others park themselves in their constructor until
183  * master or cluster shutdown or until the active master loses its lease in
184  * zookeeper.  Thereafter, all running master jostle to take over master role.
185  *
186  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
187  * this case it will tell all regionservers to go down and then wait on them
188  * all reporting in that they are down.  This master will then shut itself down.
189  *
190  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
191  *
192  * @see org.apache.zookeeper.Watcher
193  */
194 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
195 @SuppressWarnings("deprecation")
196 public class HMaster extends HRegionServer implements MasterServices {
197   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
198
199
200   /**
201    * Protection against zombie master. Started once Master accepts active responsibility and
202    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
203    */
204   private static class InitializationMonitor extends HasThread {
205     /** The amount of time in milliseconds to sleep before checking initialization status. */
206     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
207     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
208
209     /**
210      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
211      * true, do nothing otherwise.
212      */
213     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
214     public static final boolean HALT_DEFAULT = false;
215
216     private final HMaster master;
217     private final long timeout;
218     private final boolean haltOnTimeout;
219
220     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
221     InitializationMonitor(HMaster master) {
222       super("MasterInitializationMonitor");
223       this.master = master;
224       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
225       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
226       this.setDaemon(true);
227     }
228
229     @Override
230     public void run() {
231       try {
232         while (!master.isStopped() && master.isActiveMaster()) {
233           Thread.sleep(timeout);
234           if (master.isInitialized()) {
235             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
236           } else {
237             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
238                 + " consider submitting a bug report including a thread dump of this process.");
239             if (haltOnTimeout) {
240               LOG.error("Zombie Master exiting. Thread dump to stdout");
241               Threads.printThreadInfo(System.out, "Zombie HMaster");
242               System.exit(-1);
243             }
244           }
245         }
246       } catch (InterruptedException ie) {
247         LOG.trace("InitMonitor thread interrupted. Existing.");
248       }
249     }
250   }
251
252   // MASTER is name of the webapp and the attribute name used stuffing this
253   //instance into web context.
254   public static final String MASTER = "master";
255
256   // Manager and zk listener for master election
257   private final ActiveMasterManager activeMasterManager;
258   // Region server tracker
259   RegionServerTracker regionServerTracker;
260   // Draining region server tracker
261   private DrainingServerTracker drainingServerTracker;
262   // Tracker for load balancer state
263   LoadBalancerTracker loadBalancerTracker;
264
265   // Tracker for split and merge state
266   private SplitOrMergeTracker splitOrMergeTracker;
267
268   // Tracker for region normalizer state
269   private RegionNormalizerTracker regionNormalizerTracker;
270
271   //Tracker for master maintenance mode setting
272   private MasterMaintenanceModeTracker maintenanceModeTracker;
273
274   private ClusterSchemaService clusterSchemaService;
275
276   // Metrics for the HMaster
277   final MetricsMaster metricsMaster;
278   // file system manager for the master FS operations
279   private MasterFileSystem fileSystemManager;
280   private MasterWalManager walManager;
281
282   // server manager to deal with region server info
283   private volatile ServerManager serverManager;
284
285   // manager of assignment nodes in zookeeper
286   private AssignmentManager assignmentManager;
287
288   // buffer for "fatal error" notices from region servers
289   // in the cluster. This is only used for assisting
290   // operations/debugging.
291   MemoryBoundedLogMessageBuffer rsFatals;
292
293   // flag set after we become the active master (used for testing)
294   private volatile boolean isActiveMaster = false;
295
296   // flag set after we complete initialization once active,
297   // it is not private since it's used in unit tests
298   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
299
300   // flag set after master services are started,
301   // initialization may have not completed yet.
302   volatile boolean serviceStarted = false;
303
304   // flag set after we complete assignMeta.
305   private final ProcedureEvent serverCrashProcessingEnabled =
306     new ProcedureEvent("server crash processing");
307
308   private LoadBalancer balancer;
309   private RegionNormalizer normalizer;
310   private BalancerChore balancerChore;
311   private RegionNormalizerChore normalizerChore;
312   private ClusterStatusChore clusterStatusChore;
313   private ClusterStatusPublisher clusterStatusPublisherChore = null;
314   private PeriodicDoMetrics periodicDoMetricsChore = null;
315
316   CatalogJanitor catalogJanitorChore;
317   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
318   private ReplicationMetaCleaner replicationMetaCleaner;
319   private LogCleaner logCleaner;
320   private HFileCleaner hfileCleaner;
321   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
322   private MobCompactionChore mobCompactChore;
323   private MasterMobCompactionThread mobCompactThread;
324   // used to synchronize the mobCompactionStates
325   private final IdLock mobCompactionLock = new IdLock();
326   // save the information of mob compactions in tables.
327   // the key is table name, the value is the number of compactions in that table.
328   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
329
330   MasterCoprocessorHost cpHost;
331
332   private final boolean preLoadTableDescriptors;
333
334   // Time stamps for when a hmaster became active
335   private long masterActiveTime;
336
337   //should we check the compression codec type at master side, default true, HBASE-6370
338   private final boolean masterCheckCompression;
339
340   //should we check encryption settings at master side, default true
341   private final boolean masterCheckEncryption;
342
343   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
344
345   // monitor for snapshot of hbase tables
346   SnapshotManager snapshotManager;
347   // monitor for distributed procedures
348   private MasterProcedureManagerHost mpmHost;
349
350   // it is assigned after 'initialized' guard set to true, so should be volatile
351   private volatile MasterQuotaManager quotaManager;
352
353   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
354   private WALProcedureStore procedureStore;
355
356   // handle table states
357   private TableStateManager tableStateManager;
358
359   private long splitPlanCount;
360   private long mergePlanCount;
361
362   /** flag used in test cases in order to simulate RS failures during master initialization */
363   private volatile boolean initializationBeforeMetaAssignment = false;
364
365   /** jetty server for master to redirect requests to regionserver infoServer */
366   private org.mortbay.jetty.Server masterJettyServer;
367
368   public static class RedirectServlet extends HttpServlet {
369     private static final long serialVersionUID = 2894774810058302472L;
370     private static int regionServerInfoPort;
371
372     @Override
373     public void doGet(HttpServletRequest request,
374         HttpServletResponse response) throws ServletException, IOException {
375       String redirectUrl = request.getScheme() + "://"
376         + request.getServerName() + ":" + regionServerInfoPort
377         + request.getRequestURI();
378       response.sendRedirect(redirectUrl);
379     }
380   }
381
382   private static class PeriodicDoMetrics extends ScheduledChore {
383     private final HMaster server;
384     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
385       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
386       this.server = server;
387     }
388
389     @Override
390     protected void chore() {
391       server.doMetrics();
392     }
393   }
394
395   /**
396    * Initializes the HMaster. The steps are as follows:
397    * <p>
398    * <ol>
399    * <li>Initialize the local HRegionServer
400    * <li>Start the ActiveMasterManager.
401    * </ol>
402    * <p>
403    * Remaining steps of initialization occur in
404    * #finishActiveMasterInitialization(MonitoredTask) after
405    * the master becomes the active one.
406    */
407   public HMaster(final Configuration conf, CoordinatedStateManager csm)
408       throws IOException, KeeperException {
409     super(conf, csm);
410     this.rsFatals = new MemoryBoundedLogMessageBuffer(
411       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
412
413     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
414       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
415
416     // Disable usage of meta replicas in the master
417     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
418
419     Replication.decorateMasterConfiguration(this.conf);
420
421     // Hack! Maps DFSClient => Master for logs.  HDFS made this
422     // config param for task trackers, but we can piggyback off of it.
423     if (this.conf.get("mapreduce.task.attempt.id") == null) {
424       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
425     }
426
427     // should we check the compression codec type at master side, default true, HBASE-6370
428     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
429
430     // should we check encryption settings at master side, default true
431     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
432
433     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
434
435     // preload table descriptor at startup
436     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
437
438     // Do we publish the status?
439
440     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
441         HConstants.STATUS_PUBLISHED_DEFAULT);
442     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
443         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
444             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
445             ClusterStatusPublisher.Publisher.class);
446
447     if (shouldPublish) {
448       if (publisherClass == null) {
449         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
450             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
451             " is not set - not publishing status");
452       } else {
453         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
454         getChoreService().scheduleChore(clusterStatusPublisherChore);
455       }
456     }
457
458     // Some unit tests don't need a cluster, so no zookeeper at all
459     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
460       setInitLatch(new CountDownLatch(1));
461       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
462       int infoPort = putUpJettyServer();
463       startActiveMasterManager(infoPort);
464     } else {
465       activeMasterManager = null;
466     }
467   }
468
469   // return the actual infoPort, -1 means disable info server.
470   private int putUpJettyServer() throws IOException {
471     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
472       return -1;
473     }
474     int infoPort = conf.getInt("hbase.master.info.port.orig",
475       HConstants.DEFAULT_MASTER_INFOPORT);
476     // -1 is for disabling info server, so no redirecting
477     if (infoPort < 0 || infoServer == null) {
478       return -1;
479     }
480     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
481     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
482       String msg =
483           "Failed to start redirecting jetty server. Address " + addr
484               + " does not belong to this host. Correct configuration parameter: "
485               + "hbase.master.info.bindAddress";
486       LOG.error(msg);
487       throw new IOException(msg);
488     }
489
490     RedirectServlet.regionServerInfoPort = infoServer.getPort();
491     if(RedirectServlet.regionServerInfoPort == infoPort) {
492       return infoPort;
493     }
494     masterJettyServer = new org.mortbay.jetty.Server();
495     Connector connector = new SelectChannelConnector();
496     connector.setHost(addr);
497     connector.setPort(infoPort);
498     masterJettyServer.addConnector(connector);
499     masterJettyServer.setStopAtShutdown(true);
500     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
501     context.addServlet(RedirectServlet.class, "/*");
502     try {
503       masterJettyServer.start();
504     } catch (Exception e) {
505       throw new IOException("Failed to start redirecting jetty server", e);
506     }
507     return connector.getLocalPort();
508   }
509
510   @Override
511   protected TableDescriptors getFsTableDescriptors() throws IOException {
512     return super.getFsTableDescriptors();
513   }
514
515   /**
516    * For compatibility, if failed with regionserver credentials, try the master one
517    */
518   @Override
519   protected void login(UserProvider user, String host) throws IOException {
520     try {
521       super.login(user, host);
522     } catch (IOException ie) {
523       user.login("hbase.master.keytab.file",
524         "hbase.master.kerberos.principal", host);
525     }
526   }
527
528   /**
529    * If configured to put regions on active master,
530    * wait till a backup master becomes active.
531    * Otherwise, loop till the server is stopped or aborted.
532    */
533   @Override
534   protected void waitForMasterActive(){
535     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
536     while (!(tablesOnMaster && isActiveMaster)
537         && !isStopped() && !isAborted()) {
538       sleeper.sleep();
539     }
540   }
541
542   @VisibleForTesting
543   public MasterRpcServices getMasterRpcServices() {
544     return (MasterRpcServices)rpcServices;
545   }
546
547   public boolean balanceSwitch(final boolean b) throws IOException {
548     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
549   }
550
551   @Override
552   protected String getProcessName() {
553     return MASTER;
554   }
555
556   @Override
557   protected boolean canCreateBaseZNode() {
558     return true;
559   }
560
561   @Override
562   protected boolean canUpdateTableDescriptor() {
563     return true;
564   }
565
566   @Override
567   protected RSRpcServices createRpcServices() throws IOException {
568     return new MasterRpcServices(this);
569   }
570
571   @Override
572   protected void configureInfoServer() {
573     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
574     infoServer.setAttribute(MASTER, this);
575     if (BaseLoadBalancer.tablesOnMaster(conf)) {
576       super.configureInfoServer();
577     }
578   }
579
580   @Override
581   protected Class<? extends HttpServlet> getDumpServlet() {
582     return MasterDumpServlet.class;
583   }
584
585   /**
586    * Emit the HMaster metrics, such as region in transition metrics.
587    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
588    */
589   private void doMetrics() {
590     try {
591       if (assignmentManager != null) {
592         assignmentManager.updateRegionsInTransitionMetrics();
593       }
594     } catch (Throwable e) {
595       LOG.error("Couldn't update metrics: " + e.getMessage());
596     }
597   }
598
599   MetricsMaster getMasterMetrics() {
600     return metricsMaster;
601   }
602
603   /**
604    * Initialize all ZK based system trackers.
605    */
606   void initializeZKBasedSystemTrackers() throws IOException,
607       InterruptedException, KeeperException, CoordinatedStateException {
608     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
609     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
610     this.normalizer.setMasterServices(this);
611     this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
612     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
613     this.loadBalancerTracker.start();
614
615     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
616     this.regionNormalizerTracker.start();
617
618     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
619     this.splitOrMergeTracker.start();
620
621     this.assignmentManager = new AssignmentManager(this, serverManager,
622       this.balancer, this.service, this.metricsMaster,
623       this.tableLockManager, tableStateManager);
624
625     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
626     this.regionServerTracker.start();
627
628     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
629     this.drainingServerTracker.start();
630
631     this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
632     this.maintenanceModeTracker.start();
633
634     // Set the cluster as up.  If new RSs, they'll be waiting on this before
635     // going ahead with their startup.
636     boolean wasUp = this.clusterStatusTracker.isClusterUp();
637     if (!wasUp) this.clusterStatusTracker.setClusterUp();
638
639     LOG.info("Server active/primary master=" + this.serverName +
640         ", sessionid=0x" +
641         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
642         ", setting cluster-up flag (Was=" + wasUp + ")");
643
644     // create/initialize the snapshot manager and other procedure managers
645     this.snapshotManager = new SnapshotManager();
646     this.mpmHost = new MasterProcedureManagerHost();
647     this.mpmHost.register(this.snapshotManager);
648     this.mpmHost.register(new MasterFlushTableProcedureManager());
649     this.mpmHost.loadProcedures(conf);
650     this.mpmHost.initialize(this, this.metricsMaster);
651
652   }
653
654   /**
655    * Finish initialization of HMaster after becoming the primary master.
656    *
657    * <ol>
658    * <li>Initialize master components - file system manager, server manager,
659    *     assignment manager, region server tracker, etc</li>
660    * <li>Start necessary service threads - balancer, catalog janior,
661    *     executor services, etc</li>
662    * <li>Set cluster as UP in ZooKeeper</li>
663    * <li>Wait for RegionServers to check-in</li>
664    * <li>Split logs and perform data recovery, if necessary</li>
665    * <li>Ensure assignment of meta/namespace regions<li>
666    * <li>Handle either fresh cluster start or master failover</li>
667    * </ol>
668    */
669   private void finishActiveMasterInitialization(MonitoredTask status)
670       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
671
672     isActiveMaster = true;
673     Thread zombieDetector = new Thread(new InitializationMonitor(this));
674     zombieDetector.start();
675
676     /*
677      * We are active master now... go initialize components we need to run.
678      * Note, there may be dross in zk from previous runs; it'll get addressed
679      * below after we determine if cluster startup or failover.
680      */
681
682     status.setStatus("Initializing Master file system");
683
684     this.masterActiveTime = System.currentTimeMillis();
685     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
686     this.fileSystemManager = new MasterFileSystem(this);
687     this.walManager = new MasterWalManager(this);
688
689     // enable table descriptors cache
690     this.tableDescriptors.setCacheOn();
691     // set the META's descriptor to the correct replication
692     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
693         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
694     // warm-up HTDs cache on master initialization
695     if (preLoadTableDescriptors) {
696       status.setStatus("Pre-loading table descriptors");
697       this.tableDescriptors.getAll();
698     }
699
700     // publish cluster ID
701     status.setStatus("Publishing Cluster ID in ZooKeeper");
702     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
703     this.initLatch.countDown();
704
705     this.serverManager = createServerManager(this);
706
707     // Invalidate all write locks held previously
708     this.tableLockManager.reapWriteLocks();
709     this.tableStateManager = new TableStateManager(this);
710
711     status.setStatus("Initializing ZK system trackers");
712     initializeZKBasedSystemTrackers();
713
714     // This is for backwards compatibility
715     // See HBASE-11393
716     status.setStatus("Update TableCFs node in ZNode");
717     TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
718             conf, this.clusterConnection);
719     tableCFsUpdater.update();
720
721     // initialize master side coprocessors before we start handling requests
722     status.setStatus("Initializing master coprocessors");
723     this.cpHost = new MasterCoprocessorHost(this, this.conf);
724
725     // start up all service threads.
726     status.setStatus("Initializing master service threads");
727     startServiceThreads();
728
729     // Wake up this server to check in
730     sleeper.skipSleepCycle();
731
732     // Wait for region servers to report in
733     status.setStatus("Wait for region servers to report in");
734     waitForRegionServers(status);
735
736     // get a list for previously failed RS which need log splitting work
737     // we recover hbase:meta region servers inside master initialization and
738     // handle other failed servers in SSH in order to start up master node ASAP
739     MasterMetaBootstrap metaBootstrap = createMetaBootstrap(this, status);
740     metaBootstrap.splitMetaLogsBeforeAssignment();
741
742     this.initializationBeforeMetaAssignment = true;
743
744     // Wait for regionserver to finish initialization.
745     if (BaseLoadBalancer.tablesOnMaster(conf)) {
746       waitForServerOnline();
747     }
748
749     //initialize load balancer
750     this.balancer.setClusterStatus(getClusterStatus());
751     this.balancer.setMasterServices(this);
752     this.balancer.initialize();
753
754     // Check if master is shutting down because of some issue
755     // in initializing the regionserver or the balancer.
756     if (isStopped()) return;
757
758     // Make sure meta assigned before proceeding.
759     status.setStatus("Assigning Meta Region");
760     metaBootstrap.assignMeta();
761
762     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
763     // assigned when master is shutting down
764     if (isStopped()) return;
765
766     // migrating existent table state from zk, so splitters
767     // and recovery process treat states properly.
768     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
769         .queryForTableStates(getZooKeeper()).entrySet()) {
770       LOG.info("Converting state from zk to new states:" + entry);
771       tableStateManager.setTableState(entry.getKey(), entry.getValue());
772     }
773     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
774
775     status.setStatus("Submitting log splitting work for previously failed region servers");
776     metaBootstrap.processDeadServers();
777
778     // Fix up assignment manager status
779     status.setStatus("Starting assignment manager");
780     this.assignmentManager.joinCluster();
781
782     // set cluster status again after user regions are assigned
783     this.balancer.setClusterStatus(getClusterStatus());
784
785     // Start balancer and meta catalog janitor after meta and regions have been assigned.
786     status.setStatus("Starting balancer and catalog janitor");
787     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
788     getChoreService().scheduleChore(clusterStatusChore);
789     this.balancerChore = new BalancerChore(this);
790     getChoreService().scheduleChore(balancerChore);
791     this.normalizerChore = new RegionNormalizerChore(this);
792     getChoreService().scheduleChore(normalizerChore);
793     this.catalogJanitorChore = new CatalogJanitor(this, this);
794     getChoreService().scheduleChore(catalogJanitorChore);
795
796     // Do Metrics periodically
797     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
798     getChoreService().scheduleChore(periodicDoMetricsChore);
799
800     status.setStatus("Starting cluster schema service");
801     initClusterSchemaService();
802
803     if (this.cpHost != null) {
804       try {
805         this.cpHost.preMasterInitialization();
806       } catch (IOException e) {
807         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
808       }
809     }
810
811     status.markComplete("Initialization successful");
812     LOG.info("Master has completed initialization");
813     configurationManager.registerObserver(this.balancer);
814
815     // Set master as 'initialized'.
816     setInitialized(true);
817
818     status.setStatus("Assign meta replicas");
819     metaBootstrap.assignMetaReplicas();
820
821     status.setStatus("Starting quota manager");
822     initQuotaManager();
823
824     // clear the dead servers with same host name and port of online server because we are not
825     // removing dead server with same hostname and port of rs which is trying to check in before
826     // master initialization. See HBASE-5916.
827     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
828
829     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
830     status.setStatus("Checking ZNode ACLs");
831     zooKeeper.checkAndSetZNodeAcls();
832
833     status.setStatus("Initializing MOB Cleaner");
834     initMobCleaner();
835
836     status.setStatus("Calling postStartMaster coprocessors");
837     if (this.cpHost != null) {
838       // don't let cp initialization errors kill the master
839       try {
840         this.cpHost.postStartMaster();
841       } catch (IOException ioe) {
842         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
843       }
844     }
845
846     zombieDetector.interrupt();
847   }
848
849   private void initMobCleaner() {
850     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
851     getChoreService().scheduleChore(expiredMobFileCleanerChore);
852
853     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
854         MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
855     if (mobCompactionPeriod > 0) {
856       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
857       getChoreService().scheduleChore(mobCompactChore);
858     } else {
859       LOG
860         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
861     }
862     this.mobCompactThread = new MasterMobCompactionThread(this);
863   }
864
865   /**
866    * Create a {@link MasterMetaBootstrap} instance.
867    */
868   MasterMetaBootstrap createMetaBootstrap(final HMaster master, final MonitoredTask status) {
869     // We put this out here in a method so can do a Mockito.spy and stub it out
870     // w/ a mocked up MasterMetaBootstrap.
871     return new MasterMetaBootstrap(master, status);
872   }
873
874   /**
875    * Create a {@link ServerManager} instance.
876    */
877   ServerManager createServerManager(final MasterServices master) throws IOException {
878     // We put this out here in a method so can do a Mockito.spy and stub it out
879     // w/ a mocked up ServerManager.
880     setupClusterConnection();
881     return new ServerManager(master);
882   }
883
884   private void waitForRegionServers(final MonitoredTask status)
885       throws IOException, InterruptedException {
886     this.serverManager.waitForRegionServers(status);
887     // Check zk for region servers that are up but didn't register
888     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
889       // The isServerOnline check is opportunistic, correctness is handled inside
890       if (!this.serverManager.isServerOnline(sn)
891           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
892         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
893       }
894     }
895   }
896
897   void initClusterSchemaService() throws IOException, InterruptedException {
898     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
899     this.clusterSchemaService.startAndWait();
900     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
901   }
902
903   void initQuotaManager() throws IOException {
904     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
905     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
906     quotaManager.start();
907     this.quotaManager = quotaManager;
908   }
909
910   boolean isCatalogJanitorEnabled() {
911     return catalogJanitorChore != null ?
912       catalogJanitorChore.getEnabled() : false;
913   }
914
915   @Override
916   public TableDescriptors getTableDescriptors() {
917     return this.tableDescriptors;
918   }
919
920   @Override
921   public ServerManager getServerManager() {
922     return this.serverManager;
923   }
924
925   @Override
926   public MasterFileSystem getMasterFileSystem() {
927     return this.fileSystemManager;
928   }
929
930   @Override
931   public MasterWalManager getMasterWalManager() {
932     return this.walManager;
933   }
934
935   @Override
936   public TableStateManager getTableStateManager() {
937     return tableStateManager;
938   }
939
940   /*
941    * Start up all services. If any of these threads gets an unhandled exception
942    * then they just die with a logged message.  This should be fine because
943    * in general, we do not expect the master to get such unhandled exceptions
944    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
945    *  need to install an unexpected exception handler.
946    */
947   private void startServiceThreads() throws IOException{
948    // Start the executor service pools
949    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
950       conf.getInt("hbase.master.executor.openregion.threads", 5));
951    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
952       conf.getInt("hbase.master.executor.closeregion.threads", 5));
953    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
954       conf.getInt("hbase.master.executor.serverops.threads", 5));
955    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
956       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
957    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
958       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
959
960    // We depend on there being only one instance of this executor running
961    // at a time.  To do concurrency, would need fencing of enable/disable of
962    // tables.
963    // Any time changing this maxThreads to > 1, pls see the comment at
964    // AccessController#postCompletedCreateTableAction
965    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
966    startProcedureExecutor();
967
968    // Start log cleaner thread
969    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
970    this.logCleaner =
971       new LogCleaner(cleanerInterval,
972          this, conf, getMasterWalManager().getFileSystem(),
973          getMasterWalManager().getOldLogDir());
974     getChoreService().scheduleChore(logCleaner);
975
976    //start the hfile archive cleaner thread
977     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
978     Map<String, Object> params = new HashMap<String, Object>();
979     params.put(MASTER, this);
980     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
981         .getFileSystem(), archiveDir, params);
982     getChoreService().scheduleChore(hfileCleaner);
983     serviceStarted = true;
984     if (LOG.isTraceEnabled()) {
985       LOG.trace("Started service threads");
986     }
987     if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
988         ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class && !conf
989         .getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
990       try {
991         replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
992             cleanerInterval, this.getZooKeeper(), this.conf);
993         getChoreService().scheduleChore(replicationZKLockCleanerChore);
994       } catch (Exception e) {
995         LOG.error("start replicationZKLockCleanerChore failed", e);
996       }
997     }
998     replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
999     getChoreService().scheduleChore(replicationMetaCleaner);
1000   }
1001
1002   @Override
1003   protected void sendShutdownInterrupt() {
1004     super.sendShutdownInterrupt();
1005     stopProcedureExecutor();
1006   }
1007
1008   @Override
1009   protected void stopServiceThreads() {
1010     if (masterJettyServer != null) {
1011       LOG.info("Stopping master jetty server");
1012       try {
1013         masterJettyServer.stop();
1014       } catch (Exception e) {
1015         LOG.error("Failed to stop master jetty server", e);
1016       }
1017     }
1018     super.stopServiceThreads();
1019     stopChores();
1020
1021     // Wait for all the remaining region servers to report in IFF we were
1022     // running a cluster shutdown AND we were NOT aborting.
1023     if (!isAborted() && this.serverManager != null &&
1024         this.serverManager.isClusterShutdown()) {
1025       this.serverManager.letRegionServersShutdown();
1026     }
1027     if (LOG.isDebugEnabled()) {
1028       LOG.debug("Stopping service threads");
1029     }
1030     // Clean up and close up shop
1031     if (this.logCleaner != null) this.logCleaner.cancel(true);
1032     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1033     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
1034     if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
1035     if (this.quotaManager != null) this.quotaManager.stop();
1036     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1037     if (this.serverManager != null) this.serverManager.stop();
1038     if (this.assignmentManager != null) this.assignmentManager.stop();
1039     if (this.walManager != null) this.walManager.stop();
1040     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1041     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1042   }
1043
1044   private void startProcedureExecutor() throws IOException {
1045     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1046     final Path logDir = new Path(fileSystemManager.getRootDir(),
1047         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1048
1049     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1050         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1051     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1052     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1053         procEnv.getProcedureQueue());
1054
1055     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1056         Math.max(Runtime.getRuntime().availableProcessors(),
1057           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1058     final boolean abortOnCorruption = conf.getBoolean(
1059         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1060         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1061     procedureStore.start(numThreads);
1062     procedureExecutor.start(numThreads, abortOnCorruption);
1063   }
1064
1065   private void stopProcedureExecutor() {
1066     if (procedureExecutor != null) {
1067       procedureExecutor.stop();
1068     }
1069
1070     if (procedureStore != null) {
1071       procedureStore.stop(isAborted());
1072     }
1073   }
1074
1075   private void stopChores() {
1076     if (this.expiredMobFileCleanerChore != null) {
1077       this.expiredMobFileCleanerChore.cancel(true);
1078     }
1079     if (this.mobCompactChore != null) {
1080       this.mobCompactChore.cancel(true);
1081     }
1082     if (this.balancerChore != null) {
1083       this.balancerChore.cancel(true);
1084     }
1085     if (this.normalizerChore != null) {
1086       this.normalizerChore.cancel(true);
1087     }
1088     if (this.clusterStatusChore != null) {
1089       this.clusterStatusChore.cancel(true);
1090     }
1091     if (this.catalogJanitorChore != null) {
1092       this.catalogJanitorChore.cancel(true);
1093     }
1094     if (this.clusterStatusPublisherChore != null){
1095       clusterStatusPublisherChore.cancel(true);
1096     }
1097     if (this.mobCompactThread != null) {
1098       this.mobCompactThread.close();
1099     }
1100
1101     if (this.periodicDoMetricsChore != null) {
1102       periodicDoMetricsChore.cancel();
1103     }
1104   }
1105
1106   /**
1107    * @return Get remote side's InetAddress
1108    */
1109   InetAddress getRemoteInetAddress(final int port,
1110       final long serverStartCode) throws UnknownHostException {
1111     // Do it out here in its own little method so can fake an address when
1112     // mocking up in tests.
1113     InetAddress ia = RpcServer.getRemoteIp();
1114
1115     // The call could be from the local regionserver,
1116     // in which case, there is no remote address.
1117     if (ia == null && serverStartCode == startcode) {
1118       InetSocketAddress isa = rpcServices.getSocketAddress();
1119       if (isa != null && isa.getPort() == port) {
1120         ia = isa.getAddress();
1121       }
1122     }
1123     return ia;
1124   }
1125
1126   /**
1127    * @return Maximum time we should run balancer for
1128    */
1129   private int getBalancerCutoffTime() {
1130     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1131     if (balancerCutoffTime == -1) {
1132       // if cutoff time isn't set, defaulting it to period time
1133       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1134       balancerCutoffTime = balancerPeriod;
1135     }
1136     return balancerCutoffTime;
1137   }
1138
1139   public boolean balance() throws IOException {
1140     return balance(false);
1141   }
1142
1143   public boolean balance(boolean force) throws IOException {
1144     // if master not initialized, don't run balancer.
1145     if (!isInitialized()) {
1146       LOG.debug("Master has not been initialized, don't run balancer.");
1147       return false;
1148     }
1149
1150     if (isInMaintenanceMode()) {
1151       LOG.info("Master is in maintenanceMode mode, don't run balancer.");
1152       return false;
1153     }
1154
1155     // Do this call outside of synchronized block.
1156     int maximumBalanceTime = getBalancerCutoffTime();
1157     synchronized (this.balancer) {
1158       // If balance not true, don't run balancer.
1159       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1160       // Only allow one balance run at at time.
1161       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1162         Set<RegionState> regionsInTransition =
1163           this.assignmentManager.getRegionStates().getRegionsInTransition();
1164         // if hbase:meta region is in transition, result of assignment cannot be recorded
1165         // ignore the force flag in that case
1166         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1167         String prefix = force && !metaInTransition ? "R" : "Not r";
1168         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1169           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1170             abbreviate(regionsInTransition.toString(), 256));
1171         if (!force || metaInTransition) return false;
1172       }
1173       if (this.serverManager.areDeadServersInProgress()) {
1174         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1175           this.serverManager.getDeadServers());
1176         return false;
1177       }
1178
1179       if (this.cpHost != null) {
1180         try {
1181           if (this.cpHost.preBalance()) {
1182             LOG.debug("Coprocessor bypassing balancer request");
1183             return false;
1184           }
1185         } catch (IOException ioe) {
1186           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1187           return false;
1188         }
1189       }
1190
1191       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1192         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1193
1194       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1195
1196       //Give the balancer the current cluster state.
1197       this.balancer.setClusterStatus(getClusterStatus());
1198       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1199         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1200         if (partialPlans != null) plans.addAll(partialPlans);
1201       }
1202
1203       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1204       int rpCount = 0;  // number of RegionPlans balanced so far
1205       long totalRegPlanExecTime = 0;
1206       if (plans != null && !plans.isEmpty()) {
1207         for (RegionPlan plan: plans) {
1208           LOG.info("balance " + plan);
1209           long balStartTime = System.currentTimeMillis();
1210           //TODO: bulk assign
1211           this.assignmentManager.balance(plan);
1212           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1213           rpCount++;
1214           if (rpCount < plans.size() &&
1215               // if performing next balance exceeds cutoff time, exit the loop
1216               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1217             //TODO: After balance, there should not be a cutoff time (keeping it as
1218             // a security net for now)
1219             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1220               maximumBalanceTime);
1221             break;
1222           }
1223         }
1224       }
1225       if (this.cpHost != null) {
1226         try {
1227           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1228         } catch (IOException ioe) {
1229           // balancing already succeeded so don't change the result
1230           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1231         }
1232       }
1233     }
1234     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1235     // Return true indicating a success.
1236     return true;
1237   }
1238
1239   @Override
1240   @VisibleForTesting
1241   public RegionNormalizer getRegionNormalizer() {
1242     return this.normalizer;
1243   }
1244
1245   /**
1246    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1247    *
1248    * @return true if normalization step was performed successfully, false otherwise
1249    *    (specifically, if HMaster hasn't been initialized properly or normalization
1250    *    is globally disabled)
1251    */
1252   public boolean normalizeRegions() throws IOException {
1253     if (!isInitialized()) {
1254       LOG.debug("Master has not been initialized, don't run region normalizer.");
1255       return false;
1256     }
1257
1258     if (isInMaintenanceMode()) {
1259       LOG.info("Master is in maintenance mode, don't run region normalizer.");
1260       return false;
1261     }
1262
1263     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1264       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1265       return false;
1266     }
1267
1268     synchronized (this.normalizer) {
1269       // Don't run the normalizer concurrently
1270       List<TableName> allEnabledTables = new ArrayList<>(
1271         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1272
1273       Collections.shuffle(allEnabledTables);
1274
1275       for (TableName table : allEnabledTables) {
1276         if (isInMaintenanceMode()) {
1277           LOG.debug("Master is in maintenance mode, stop running region normalizer.");
1278           return false;
1279         }
1280
1281         HTableDescriptor tblDesc = getTableDescriptors().get(table);
1282         if (table.isSystemTable() || (tblDesc != null &&
1283             !tblDesc.isNormalizationEnabled())) {
1284           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1285               + " table or doesn't have auto normalization turned on");
1286           continue;
1287         }
1288         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1289         if (plans != null) {
1290           for (NormalizationPlan plan : plans) {
1291             plan.execute(clusterConnection.getAdmin());
1292             if (plan.getType() == PlanType.SPLIT) {
1293               splitPlanCount++;
1294             } else if (plan.getType() == PlanType.MERGE) {
1295               mergePlanCount++;
1296             }
1297           }
1298         }
1299       }
1300     }
1301     // If Region did not generate any plans, it means the cluster is already balanced.
1302     // Return true indicating a success.
1303     return true;
1304   }
1305
1306   /**
1307    * @return Client info for use as prefix on an audit log string; who did an action
1308    */
1309   String getClientIdAuditPrefix() {
1310     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1311   }
1312
1313   /**
1314    * Switch for the background CatalogJanitor thread.
1315    * Used for testing.  The thread will continue to run.  It will just be a noop
1316    * if disabled.
1317    * @param b If false, the catalog janitor won't do anything.
1318    */
1319   public void setCatalogJanitorEnabled(final boolean b) {
1320     this.catalogJanitorChore.setEnabled(b);
1321   }
1322
1323   @Override
1324   public long dispatchMergingRegions(
1325       final HRegionInfo regionInfoA,
1326       final HRegionInfo regionInfoB,
1327       final boolean forcible,
1328       final long nonceGroup,
1329       final long nonce) throws IOException {
1330     checkInitialized();
1331
1332     TableName tableName = regionInfoA.getTable();
1333     if (tableName == null || regionInfoB.getTable() == null) {
1334       throw new UnknownRegionException ("Can't merge regions without table associated");
1335     }
1336
1337     if (!tableName.equals(regionInfoB.getTable())) {
1338       throw new IOException ("Cannot merge regions from two different tables");
1339     }
1340
1341     if (regionInfoA.compareTo(regionInfoB) == 0) {
1342       throw new MergeRegionException(
1343         "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB);
1344     }
1345
1346     HRegionInfo [] regionsToMerge = new HRegionInfo[2];
1347     regionsToMerge [0] = regionInfoA;
1348     regionsToMerge [1] = regionInfoB;
1349
1350     if (cpHost != null) {
1351       cpHost.preDispatchMerge(regionInfoA, regionInfoB);
1352     }
1353
1354     LOG.info(getClientIdAuditPrefix() + " Merge regions "
1355         + regionInfoA.getEncodedName() + " and " + regionInfoB.getEncodedName());
1356
1357     long procId = this.procedureExecutor.submitProcedure(
1358       new DispatchMergingRegionsProcedure(
1359         procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible),
1360       nonceGroup,
1361       nonce);
1362
1363     if (cpHost != null) {
1364       cpHost.postDispatchMerge(regionInfoA, regionInfoB);
1365     }
1366     return procId;
1367   }
1368
1369   void move(final byte[] encodedRegionName,
1370       final byte[] destServerName) throws HBaseIOException {
1371     RegionState regionState = assignmentManager.getRegionStates().
1372       getRegionState(Bytes.toString(encodedRegionName));
1373
1374     HRegionInfo hri;
1375     if (regionState != null) {
1376       hri = regionState.getRegion();
1377     } else {
1378       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1379     }
1380
1381     ServerName dest;
1382     if (destServerName == null || destServerName.length == 0) {
1383       LOG.info("Passed destination servername is null/empty so " +
1384         "choosing a server at random");
1385       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1386         regionState.getServerName());
1387       dest = balancer.randomAssignment(hri, destServers);
1388       if (dest == null) {
1389         LOG.debug("Unable to determine a plan to assign " + hri);
1390         return;
1391       }
1392     } else {
1393       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1394       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1395       if (dest == null) {
1396         LOG.debug("Unable to determine a plan to assign " + hri);
1397         return;
1398       }
1399       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1400           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1401         // To avoid unnecessary region moving later by balancer. Don't put user
1402         // regions on master. Regions on master could be put on other region
1403         // server intentionally by test however.
1404         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1405           + " to avoid unnecessary region moving later by load balancer,"
1406           + " because it should not be on master");
1407         return;
1408       }
1409     }
1410
1411     if (dest.equals(regionState.getServerName())) {
1412       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1413         + " because region already assigned to the same server " + dest + ".");
1414       return;
1415     }
1416
1417     // Now we can do the move
1418     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1419
1420     try {
1421       checkInitialized();
1422       if (this.cpHost != null) {
1423         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1424           return;
1425         }
1426       }
1427       // warmup the region on the destination before initiating the move. this call
1428       // is synchronous and takes some time. doing it before the source region gets
1429       // closed
1430       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1431
1432       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1433       this.assignmentManager.balance(rp);
1434       if (this.cpHost != null) {
1435         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1436       }
1437     } catch (IOException ioe) {
1438       if (ioe instanceof HBaseIOException) {
1439         throw (HBaseIOException)ioe;
1440       }
1441       throw new HBaseIOException(ioe);
1442     }
1443   }
1444
1445   @Override
1446   public long createTable(
1447       final HTableDescriptor hTableDescriptor,
1448       final byte [][] splitKeys,
1449       final long nonceGroup,
1450       final long nonce) throws IOException {
1451     if (isStopped()) {
1452       throw new MasterNotRunningException();
1453     }
1454     checkInitialized();
1455     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1456     this.clusterSchemaService.getNamespace(namespace);
1457
1458     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1459     checkInitialized();
1460     sanityCheckTableDescriptor(hTableDescriptor);
1461     if (cpHost != null) {
1462       cpHost.preCreateTable(hTableDescriptor, newRegions);
1463     }
1464     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1465
1466     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1467     //       TableExistsException by saying if the schema is the same or not.
1468     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1469     long procId = this.procedureExecutor.submitProcedure(
1470       new CreateTableProcedure(
1471         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1472       nonceGroup,
1473       nonce);
1474     latch.await();
1475
1476     if (cpHost != null) {
1477       cpHost.postCreateTable(hTableDescriptor, newRegions);
1478     }
1479
1480     return procId;
1481   }
1482
1483   @Override
1484   public long createSystemTable(final HTableDescriptor hTableDescriptor) throws IOException {
1485     if (isStopped()) {
1486       throw new MasterNotRunningException();
1487     }
1488
1489     TableName tableName = hTableDescriptor.getTableName();
1490     if (!(tableName.isSystemTable())) {
1491       throw new IllegalArgumentException(
1492         "Only system table creation can use this createSystemTable API");
1493     }
1494
1495     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, null);
1496
1497     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1498
1499     // This special create table is called locally to master.  Therefore, no RPC means no need
1500     // to use nonce to detect duplicated RPC call.
1501     long procId = this.procedureExecutor.submitProcedure(
1502       new CreateTableProcedure(procedureExecutor.getEnvironment(), hTableDescriptor, newRegions));
1503
1504     return procId;
1505   }
1506
1507   /**
1508    * Checks whether the table conforms to some sane limits, and configured
1509    * values (compression, etc) work. Throws an exception if something is wrong.
1510    * @throws IOException
1511    */
1512   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1513     final String CONF_KEY = "hbase.table.sanity.checks";
1514     boolean logWarn = false;
1515     if (!conf.getBoolean(CONF_KEY, true)) {
1516       logWarn = true;
1517     }
1518     String tableVal = htd.getConfigurationValue(CONF_KEY);
1519     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1520       logWarn = true;
1521     }
1522
1523     // check max file size
1524     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1525     long maxFileSize = htd.getMaxFileSize();
1526     if (maxFileSize < 0) {
1527       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1528     }
1529     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1530       String message = "MAX_FILESIZE for table descriptor or "
1531           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1532           + ") is too small, which might cause over splitting into unmanageable "
1533           + "number of regions.";
1534       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1535     }
1536
1537     // check flush size
1538     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1539     long flushSize = htd.getMemStoreFlushSize();
1540     if (flushSize < 0) {
1541       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1542     }
1543     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1544       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1545           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1546           + " very frequent flushing.";
1547       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1548     }
1549
1550     // check that coprocessors and other specified plugin classes can be loaded
1551     try {
1552       checkClassLoading(conf, htd);
1553     } catch (Exception ex) {
1554       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1555     }
1556
1557     // check compression can be loaded
1558     try {
1559       checkCompression(htd);
1560     } catch (IOException e) {
1561       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1562     }
1563
1564     // check encryption can be loaded
1565     try {
1566       checkEncryption(conf, htd);
1567     } catch (IOException e) {
1568       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1569     }
1570     // Verify compaction policy
1571     try{
1572       checkCompactionPolicy(conf, htd);
1573     } catch(IOException e){
1574       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1575     }
1576     // check that we have at least 1 CF
1577     if (htd.getColumnFamilies().length == 0) {
1578       String message = "Table should have at least one column family.";
1579       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1580     }
1581
1582     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1583       if (hcd.getTimeToLive() <= 0) {
1584         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1585         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1586       }
1587
1588       // check blockSize
1589       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1590         String message = "Block size for column family " + hcd.getNameAsString()
1591             + "  must be between 1K and 16MB.";
1592         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1593       }
1594
1595       // check versions
1596       if (hcd.getMinVersions() < 0) {
1597         String message = "Min versions for column family " + hcd.getNameAsString()
1598           + "  must be positive.";
1599         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1600       }
1601       // max versions already being checked
1602
1603       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1604       //  does not throw IllegalArgumentException
1605       // check minVersions <= maxVerions
1606       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1607         String message = "Min versions for column family " + hcd.getNameAsString()
1608             + " must be less than the Max versions.";
1609         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1610       }
1611
1612       // check replication scope
1613       checkReplicationScope(hcd);
1614
1615       // check data replication factor, it can be 0(default value) when user has not explicitly
1616       // set the value, in this case we use default replication factor set in the file system.
1617       if (hcd.getDFSReplication() < 0) {
1618         String message = "HFile Replication for column family " + hcd.getNameAsString()
1619             + "  must be greater than zero.";
1620         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1621       }
1622
1623       // TODO: should we check coprocessors and encryption ?
1624     }
1625   }
1626
1627   private void checkReplicationScope(HColumnDescriptor hcd) throws IOException{
1628     // check replication scope
1629     WALProtos.ScopeType scop = WALProtos.ScopeType.valueOf(hcd.getScope());
1630     if (scop == null) {
1631       String message = "Replication scope for column family "
1632           + hcd.getNameAsString() + " is " + hcd.getScope() + " which is invalid.";
1633
1634       LOG.error(message);
1635       throw new DoNotRetryIOException(message);
1636     }
1637   }
1638
1639   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1640       throws IOException {
1641     // FIFO compaction has some requirements
1642     // Actually FCP ignores periodic major compactions
1643     String className =
1644         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1645     if (className == null) {
1646       className =
1647           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1648             ExploringCompactionPolicy.class.getName());
1649     }
1650
1651     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1652     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1653     if (sv != null) {
1654       blockingFileCount = Integer.parseInt(sv);
1655     } else {
1656       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1657     }
1658
1659     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1660       String compactionPolicy =
1661           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1662       if (compactionPolicy == null) {
1663         compactionPolicy = className;
1664       }
1665       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1666         continue;
1667       }
1668       // FIFOCompaction
1669       String message = null;
1670
1671       // 1. Check TTL
1672       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1673         message = "Default TTL is not supported for FIFO compaction";
1674         throw new IOException(message);
1675       }
1676
1677       // 2. Check min versions
1678       if (hcd.getMinVersions() > 0) {
1679         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1680         throw new IOException(message);
1681       }
1682
1683       // 3. blocking file count
1684       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1685       if (sbfc != null) {
1686         blockingFileCount = Integer.parseInt(sbfc);
1687       }
1688       if (blockingFileCount < 1000) {
1689         message =
1690             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1691                 + " is below recommended minimum of 1000";
1692         throw new IOException(message);
1693       }
1694     }
1695   }
1696
1697   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1698   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1699       String message, Exception cause) throws IOException {
1700     if (!logWarn) {
1701       throw new DoNotRetryIOException(message + " Set " + confKey +
1702           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1703     }
1704     LOG.warn(message);
1705   }
1706
1707   private void startActiveMasterManager(int infoPort) throws KeeperException {
1708     String backupZNode = ZKUtil.joinZNode(
1709       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1710     /*
1711     * Add a ZNode for ourselves in the backup master directory since we
1712     * may not become the active master. If so, we want the actual active
1713     * master to know we are backup masters, so that it won't assign
1714     * regions to us if so configured.
1715     *
1716     * If we become the active master later, ActiveMasterManager will delete
1717     * this node explicitly.  If we crash before then, ZooKeeper will delete
1718     * this node for us since it is ephemeral.
1719     */
1720     LOG.info("Adding backup master ZNode " + backupZNode);
1721     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1722         serverName, infoPort)) {
1723       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1724     }
1725
1726     activeMasterManager.setInfoPort(infoPort);
1727     // Start a thread to try to become the active master, so we won't block here
1728     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1729       @Override
1730       public void run() {
1731         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1732           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1733         // If we're a backup master, stall until a primary to writes his address
1734         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1735           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1736           LOG.debug("HMaster started in backup mode. "
1737             + "Stalling until master znode is written.");
1738           // This will only be a minute or so while the cluster starts up,
1739           // so don't worry about setting watches on the parent znode
1740           while (!activeMasterManager.hasActiveMaster()) {
1741             LOG.debug("Waiting for master address ZNode to be written "
1742               + "(Also watching cluster state node)");
1743             Threads.sleep(timeout);
1744           }
1745         }
1746         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1747         status.setDescription("Master startup");
1748         try {
1749           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1750             finishActiveMasterInitialization(status);
1751           }
1752         } catch (Throwable t) {
1753           status.setStatus("Failed to become active: " + t.getMessage());
1754           LOG.fatal("Failed to become active master", t);
1755           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1756           if (t instanceof NoClassDefFoundError &&
1757             t.getMessage()
1758               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1759             // improved error message for this special case
1760             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1761               + "recompile HBase against Hadoop version "
1762               + org.apache.hadoop.util.VersionInfo.getVersion()
1763               + " or change your hadoop jars to start properly", t);
1764           } else {
1765             abort("Unhandled exception. Starting shutdown.", t);
1766           }
1767         } finally {
1768           status.cleanup();
1769         }
1770       }
1771     }, getServerName().toShortString() + ".activeMasterManager"));
1772   }
1773
1774   private void checkCompression(final HTableDescriptor htd)
1775   throws IOException {
1776     if (!this.masterCheckCompression) return;
1777     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1778       checkCompression(hcd);
1779     }
1780   }
1781
1782   private void checkCompression(final HColumnDescriptor hcd)
1783   throws IOException {
1784     if (!this.masterCheckCompression) return;
1785     CompressionTest.testCompression(hcd.getCompressionType());
1786     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1787   }
1788
1789   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1790   throws IOException {
1791     if (!this.masterCheckEncryption) return;
1792     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1793       checkEncryption(conf, hcd);
1794     }
1795   }
1796
1797   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1798   throws IOException {
1799     if (!this.masterCheckEncryption) return;
1800     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1801   }
1802
1803   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1804   throws IOException {
1805     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1806     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1807   }
1808
1809   private static boolean isCatalogTable(final TableName tableName) {
1810     return tableName.equals(TableName.META_TABLE_NAME);
1811   }
1812
1813   @Override
1814   public long deleteTable(
1815       final TableName tableName,
1816       final long nonceGroup,
1817       final long nonce) throws IOException {
1818     checkInitialized();
1819     if (cpHost != null) {
1820       cpHost.preDeleteTable(tableName);
1821     }
1822     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1823
1824     // TODO: We can handle/merge duplicate request
1825     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1826     long procId = this.procedureExecutor.submitProcedure(
1827       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1828       nonceGroup,
1829       nonce);
1830     latch.await();
1831
1832     if (cpHost != null) {
1833       cpHost.postDeleteTable(tableName);
1834     }
1835
1836     return procId;
1837   }
1838
1839   @Override
1840   public long truncateTable(
1841       final TableName tableName,
1842       final boolean preserveSplits,
1843       final long nonceGroup,
1844       final long nonce) throws IOException {
1845     checkInitialized();
1846     if (cpHost != null) {
1847       cpHost.preTruncateTable(tableName);
1848     }
1849     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1850
1851     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
1852     long procId = this.procedureExecutor.submitProcedure(
1853       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName,
1854         preserveSplits, latch),
1855       nonceGroup,
1856       nonce);
1857     latch.await();
1858
1859     if (cpHost != null) {
1860       cpHost.postTruncateTable(tableName);
1861     }
1862     return procId;
1863   }
1864
1865   @Override
1866   public long addColumn(
1867       final TableName tableName,
1868       final HColumnDescriptor columnDescriptor,
1869       final long nonceGroup,
1870       final long nonce)
1871       throws IOException {
1872     checkInitialized();
1873     checkCompression(columnDescriptor);
1874     checkEncryption(conf, columnDescriptor);
1875     checkReplicationScope(columnDescriptor);
1876     if (cpHost != null) {
1877       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1878         return -1;
1879       }
1880     }
1881     // Execute the operation synchronously - wait for the operation to complete before continuing.
1882     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
1883     long procId = this.procedureExecutor.submitProcedure(
1884       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
1885         columnDescriptor, latch),
1886       nonceGroup,
1887       nonce);
1888     latch.await();
1889
1890     if (cpHost != null) {
1891       cpHost.postAddColumn(tableName, columnDescriptor);
1892     }
1893     return procId;
1894   }
1895
1896   @Override
1897   public long modifyColumn(
1898       final TableName tableName,
1899       final HColumnDescriptor descriptor,
1900       final long nonceGroup,
1901       final long nonce)
1902       throws IOException {
1903     checkInitialized();
1904     checkCompression(descriptor);
1905     checkEncryption(conf, descriptor);
1906     checkReplicationScope(descriptor);
1907     if (cpHost != null) {
1908       if (cpHost.preModifyColumn(tableName, descriptor)) {
1909         return -1;
1910       }
1911     }
1912     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1913
1914     // Execute the operation synchronously - wait for the operation to complete before continuing.
1915     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
1916     long procId = this.procedureExecutor.submitProcedure(
1917       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
1918         descriptor, latch),
1919       nonceGroup,
1920       nonce);
1921     latch.await();
1922
1923     if (cpHost != null) {
1924       cpHost.postModifyColumn(tableName, descriptor);
1925     }
1926     return procId;
1927   }
1928
1929   @Override
1930   public long deleteColumn(
1931       final TableName tableName,
1932       final byte[] columnName,
1933       final long nonceGroup,
1934       final long nonce)
1935       throws IOException {
1936     checkInitialized();
1937     if (cpHost != null) {
1938       if (cpHost.preDeleteColumn(tableName, columnName)) {
1939         return -1;
1940       }
1941     }
1942     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1943
1944     // Execute the operation synchronously - wait for the operation to complete before continuing.
1945     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
1946     long procId = this.procedureExecutor.submitProcedure(
1947       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
1948         columnName, latch),
1949       nonceGroup,
1950       nonce);
1951     latch.await();
1952
1953     if (cpHost != null) {
1954       cpHost.postDeleteColumn(tableName, columnName);
1955     }
1956     return procId;
1957   }
1958
1959   @Override
1960   public long enableTable(
1961       final TableName tableName,
1962       final long nonceGroup,
1963       final long nonce) throws IOException {
1964     checkInitialized();
1965     if (cpHost != null) {
1966       cpHost.preEnableTable(tableName);
1967     }
1968     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1969
1970     // Execute the operation asynchronously - client will check the progress of the operation
1971     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1972     long procId = this.procedureExecutor.submitProcedure(
1973       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1974       nonceGroup,
1975       nonce);
1976     // Before returning to client, we want to make sure that the table is prepared to be
1977     // enabled (the table is locked and the table state is set).
1978     //
1979     // Note: if the procedure throws exception, we will catch it and rethrow.
1980     prepareLatch.await();
1981
1982     if (cpHost != null) {
1983       cpHost.postEnableTable(tableName);
1984     }
1985
1986     return procId;
1987   }
1988
1989   @Override
1990   public long disableTable(
1991       final TableName tableName,
1992       final long nonceGroup,
1993       final long nonce) throws IOException {
1994     checkInitialized();
1995     if (cpHost != null) {
1996       cpHost.preDisableTable(tableName);
1997     }
1998     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1999
2000     // Execute the operation asynchronously - client will check the progress of the operation
2001     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2002     // Execute the operation asynchronously - client will check the progress of the operation
2003     long procId = this.procedureExecutor.submitProcedure(
2004       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
2005       nonceGroup,
2006       nonce);
2007     // Before returning to client, we want to make sure that the table is prepared to be
2008     // enabled (the table is locked and the table state is set).
2009     //
2010     // Note: if the procedure throws exception, we will catch it and rethrow.
2011     prepareLatch.await();
2012
2013     if (cpHost != null) {
2014       cpHost.postDisableTable(tableName);
2015     }
2016
2017     return procId;
2018   }
2019
2020   /**
2021    * Return the region and current deployment for the region containing
2022    * the given row. If the region cannot be found, returns null. If it
2023    * is found, but not currently deployed, the second element of the pair
2024    * may be null.
2025    */
2026   @VisibleForTesting // Used by TestMaster.
2027   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2028       final TableName tableName, final byte [] rowKey)
2029   throws IOException {
2030     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2031       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2032
2033     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2034         @Override
2035         public boolean visit(Result data) throws IOException {
2036           if (data == null || data.size() <= 0) {
2037             return true;
2038           }
2039           Pair<HRegionInfo, ServerName> pair =
2040               new Pair(MetaTableAccessor.getHRegionInfo(data),
2041                   MetaTableAccessor.getServerName(data,0));
2042           if (pair == null) {
2043             return false;
2044           }
2045           if (!pair.getFirst().getTable().equals(tableName)) {
2046             return false;
2047           }
2048           result.set(pair);
2049           return true;
2050         }
2051     };
2052
2053     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2054     return result.get();
2055   }
2056
2057   @Override
2058   public long modifyTable(
2059       final TableName tableName,
2060       final HTableDescriptor descriptor,
2061       final long nonceGroup,
2062       final long nonce)
2063       throws IOException {
2064     checkInitialized();
2065     sanityCheckTableDescriptor(descriptor);
2066     if (cpHost != null) {
2067       cpHost.preModifyTable(tableName, descriptor);
2068     }
2069
2070     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2071
2072     // Execute the operation synchronously - wait for the operation completes before continuing.
2073     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2074     long procId = this.procedureExecutor.submitProcedure(
2075       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor, latch),
2076       nonceGroup,
2077       nonce);
2078     latch.await();
2079
2080     if (cpHost != null) {
2081       cpHost.postModifyTable(tableName, descriptor);
2082     }
2083
2084     return procId;
2085   }
2086
2087   @Override
2088   public void checkTableModifiable(final TableName tableName)
2089       throws IOException, TableNotFoundException, TableNotDisabledException {
2090     if (isCatalogTable(tableName)) {
2091       throw new IOException("Can't modify catalog tables");
2092     }
2093     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2094       throw new TableNotFoundException(tableName);
2095     }
2096     if (!getTableStateManager().isTableState(tableName, TableState.State.DISABLED)) {
2097       throw new TableNotDisabledException(tableName);
2098     }
2099   }
2100
2101   /**
2102    * @return cluster status
2103    */
2104   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2105     // Build Set of backup masters from ZK nodes
2106     List<String> backupMasterStrings;
2107     try {
2108       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2109         this.zooKeeper.backupMasterAddressesZNode);
2110     } catch (KeeperException e) {
2111       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2112       backupMasterStrings = null;
2113     }
2114
2115     List<ServerName> backupMasters = null;
2116     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2117       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2118       for (String s: backupMasterStrings) {
2119         try {
2120           byte [] bytes;
2121           try {
2122             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2123                 this.zooKeeper.backupMasterAddressesZNode, s));
2124           } catch (InterruptedException e) {
2125             throw new InterruptedIOException();
2126           }
2127           if (bytes != null) {
2128             ServerName sn;
2129             try {
2130               sn = ServerName.parseFrom(bytes);
2131             } catch (DeserializationException e) {
2132               LOG.warn("Failed parse, skipping registering backup server", e);
2133               continue;
2134             }
2135             backupMasters.add(sn);
2136           }
2137         } catch (KeeperException e) {
2138           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2139                    "backup servers"), e);
2140         }
2141       }
2142       Collections.sort(backupMasters, new Comparator<ServerName>() {
2143         @Override
2144         public int compare(ServerName s1, ServerName s2) {
2145           return s1.getServerName().compareTo(s2.getServerName());
2146         }});
2147     }
2148
2149     String clusterId = fileSystemManager != null ?
2150       fileSystemManager.getClusterId().toString() : null;
2151     Set<RegionState> regionsInTransition = assignmentManager != null ?
2152       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2153     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2154     boolean balancerOn = loadBalancerTracker != null ?
2155       loadBalancerTracker.isBalancerOn() : false;
2156     Map<ServerName, ServerLoad> onlineServers = null;
2157     Set<ServerName> deadServers = null;
2158     if (serverManager != null) {
2159       deadServers = serverManager.getDeadServers().copyServerNames();
2160       onlineServers = serverManager.getOnlineServers();
2161     }
2162     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2163       onlineServers, deadServers, serverName, backupMasters,
2164       regionsInTransition, coprocessors, balancerOn);
2165   }
2166
2167   /**
2168    * The set of loaded coprocessors is stored in a static set. Since it's
2169    * statically allocated, it does not require that HMaster's cpHost be
2170    * initialized prior to accessing it.
2171    * @return a String representation of the set of names of the loaded coprocessors.
2172    */
2173   public static String getLoadedCoprocessors() {
2174     return CoprocessorHost.getLoadedCoprocessors().toString();
2175   }
2176
2177   /**
2178    * @return timestamp in millis when HMaster was started.
2179    */
2180   public long getMasterStartTime() {
2181     return startcode;
2182   }
2183
2184   /**
2185    * @return timestamp in millis when HMaster became the active master.
2186    */
2187   public long getMasterActiveTime() {
2188     return masterActiveTime;
2189   }
2190
2191   public int getNumWALFiles() {
2192     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2193   }
2194
2195   public WALProcedureStore getWalProcedureStore() {
2196     return procedureStore;
2197   }
2198
2199   public int getRegionServerInfoPort(final ServerName sn) {
2200     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2201     if (info == null || info.getInfoPort() == 0) {
2202       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2203         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2204     }
2205     return info.getInfoPort();
2206   }
2207
2208   public String getRegionServerVersion(final ServerName sn) {
2209     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2210     if (info != null && info.hasVersionInfo()) {
2211       return info.getVersionInfo().getVersion();
2212     }
2213     return "Unknown";
2214   }
2215
2216   /**
2217    * @return array of coprocessor SimpleNames.
2218    */
2219   public String[] getMasterCoprocessors() {
2220     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2221     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2222   }
2223
2224   @Override
2225   public void abort(final String msg, final Throwable t) {
2226     if (isAborted() || isStopped()) {
2227       return;
2228     }
2229     if (cpHost != null) {
2230       // HBASE-4014: dump a list of loaded coprocessors.
2231       LOG.fatal("Master server abort: loaded coprocessors are: " +
2232           getLoadedCoprocessors());
2233     }
2234     if (t != null) LOG.fatal(msg, t);
2235     try {
2236       stopMaster();
2237     } catch (IOException e) {
2238       LOG.error("Exception occurred while stopping master", e);
2239     }
2240   }
2241
2242   @Override
2243   public ZooKeeperWatcher getZooKeeper() {
2244     return zooKeeper;
2245   }
2246
2247   @Override
2248   public MasterCoprocessorHost getMasterCoprocessorHost() {
2249     return cpHost;
2250   }
2251
2252   @Override
2253   public MasterQuotaManager getMasterQuotaManager() {
2254     return quotaManager;
2255   }
2256
2257   @Override
2258   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2259     return procedureExecutor;
2260   }
2261
2262   @Override
2263   public ServerName getServerName() {
2264     return this.serverName;
2265   }
2266
2267   @Override
2268   public AssignmentManager getAssignmentManager() {
2269     return this.assignmentManager;
2270   }
2271
2272   @Override
2273   public CatalogJanitor getCatalogJanitor() {
2274     return this.catalogJanitorChore;
2275   }
2276
2277   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2278     return rsFatals;
2279   }
2280
2281   public void shutdown() throws IOException {
2282     if (cpHost != null) {
2283       cpHost.preShutdown();
2284     }
2285
2286     if (this.serverManager != null) {
2287       this.serverManager.shutdownCluster();
2288     }
2289     if (this.clusterStatusTracker != null){
2290       try {
2291         this.clusterStatusTracker.setClusterDown();
2292       } catch (KeeperException e) {
2293         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2294       }
2295     }
2296   }
2297
2298   public void stopMaster() throws IOException {
2299     if (cpHost != null) {
2300       cpHost.preStopMaster();
2301     }
2302     stop("Stopped by " + Thread.currentThread().getName());
2303   }
2304
2305   void checkServiceStarted() throws ServerNotRunningYetException {
2306     if (!serviceStarted) {
2307       throw new ServerNotRunningYetException("Server is not running yet");
2308     }
2309   }
2310
2311   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2312     checkServiceStarted();
2313     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2314   }
2315
2316   /**
2317    * Report whether this master is currently the active master or not.
2318    * If not active master, we are parked on ZK waiting to become active.
2319    *
2320    * This method is used for testing.
2321    *
2322    * @return true if active master, false if not.
2323    */
2324   @Override
2325   public boolean isActiveMaster() {
2326     return isActiveMaster;
2327   }
2328
2329   /**
2330    * Report whether this master has completed with its initialization and is
2331    * ready.  If ready, the master is also the active master.  A standby master
2332    * is never ready.
2333    *
2334    * This method is used for testing.
2335    *
2336    * @return true if master is ready to go, false if not.
2337    */
2338   @Override
2339   public boolean isInitialized() {
2340     return initialized.isReady();
2341   }
2342
2343   /**
2344    * Report whether this master is in maintenance mode.
2345    *
2346    * @return true if master is in maintenanceMode
2347    */
2348   @Override
2349   public boolean isInMaintenanceMode() {
2350     return maintenanceModeTracker.isInMaintenanceMode();
2351   }
2352
2353   @VisibleForTesting
2354   public void setInitialized(boolean isInitialized) {
2355     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2356   }
2357
2358   public ProcedureEvent getInitializedEvent() {
2359     return initialized;
2360   }
2361
2362   /**
2363    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2364    * of crashed servers.
2365    * @return true if assignMeta has completed;
2366    */
2367   @Override
2368   public boolean isServerCrashProcessingEnabled() {
2369     return serverCrashProcessingEnabled.isReady();
2370   }
2371
2372   @VisibleForTesting
2373   public void setServerCrashProcessingEnabled(final boolean b) {
2374     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2375   }
2376
2377   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2378     return serverCrashProcessingEnabled;
2379   }
2380
2381   /**
2382    * Report whether this master has started initialization and is about to do meta region assignment
2383    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2384    */
2385   public boolean isInitializationStartsMetaRegionAssignment() {
2386     return this.initializationBeforeMetaAssignment;
2387   }
2388
2389   /**
2390    * Compute the average load across all region servers.
2391    * Currently, this uses a very naive computation - just uses the number of
2392    * regions being served, ignoring stats about number of requests.
2393    * @return the average load
2394    */
2395   public double getAverageLoad() {
2396     if (this.assignmentManager == null) {
2397       return 0;
2398     }
2399
2400     RegionStates regionStates = this.assignmentManager.getRegionStates();
2401     if (regionStates == null) {
2402       return 0;
2403     }
2404     return regionStates.getAverageLoad();
2405   }
2406
2407   /*
2408    * @return the count of region split plans executed
2409    */
2410   public long getSplitPlanCount() {
2411     return splitPlanCount;
2412   }
2413
2414   /*
2415    * @return the count of region merge plans executed
2416    */
2417   public long getMergePlanCount() {
2418     return mergePlanCount;
2419   }
2420
2421   @Override
2422   public boolean registerService(Service instance) {
2423     /*
2424      * No stacking of instances is allowed for a single service name
2425      */
2426     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2427     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2428     if (coprocessorServiceHandlers.containsKey(serviceName)) {
2429       LOG.error("Coprocessor service "+serviceName+
2430           " already registered, rejecting request from "+instance
2431       );
2432       return false;
2433     }
2434
2435     coprocessorServiceHandlers.put(serviceName, instance);
2436     if (LOG.isDebugEnabled()) {
2437       LOG.debug("Registered master coprocessor service: service="+serviceName);
2438     }
2439     return true;
2440   }
2441
2442   /**
2443    * Utility for constructing an instance of the passed HMaster class.
2444    * @param masterClass
2445    * @return HMaster instance.
2446    */
2447   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2448       final Configuration conf, final CoordinatedStateManager cp)  {
2449     try {
2450       Constructor<? extends HMaster> c =
2451         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2452       return c.newInstance(conf, cp);
2453     } catch(Exception e) {
2454       Throwable error = e;
2455       if (e instanceof InvocationTargetException &&
2456           ((InvocationTargetException)e).getTargetException() != null) {
2457         error = ((InvocationTargetException)e).getTargetException();
2458       }
2459       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2460         , error);
2461     }
2462   }
2463
2464   /**
2465    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2466    */
2467   public static void main(String [] args) {
2468     VersionInfo.logVersion();
2469     new HMasterCommandLine(HMaster.class).doMain(args);
2470   }
2471
2472   public HFileCleaner getHFileCleaner() {
2473     return this.hfileCleaner;
2474   }
2475
2476   /**
2477    * @return the underlying snapshot manager
2478    */
2479   @Override
2480   public SnapshotManager getSnapshotManager() {
2481     return this.snapshotManager;
2482   }
2483
2484   /**
2485    * @return the underlying MasterProcedureManagerHost
2486    */
2487   @Override
2488   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2489     return mpmHost;
2490   }
2491
2492   @Override
2493   public ClusterSchema getClusterSchema() {
2494     return this.clusterSchemaService;
2495   }
2496
2497   /**
2498    * Create a new Namespace.
2499    * @param namespaceDescriptor descriptor for new Namespace
2500    * @param nonceGroup Identifier for the source of the request, a client or process.
2501    * @param nonce A unique identifier for this operation from the client or process identified by
2502    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2503    * @return procedure id
2504    */
2505   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2506       final long nonce)
2507   throws IOException {
2508     checkInitialized();
2509     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2510     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2511       throw new BypassCoprocessorException();
2512     }
2513     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2514     // Execute the operation synchronously - wait for the operation to complete before continuing.
2515     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2516     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2517     return procId;
2518   }
2519
2520   /**
2521    * Modify an existing Namespace.
2522    * @param nonceGroup Identifier for the source of the request, a client or process.
2523    * @param nonce A unique identifier for this operation from the client or process identified by
2524    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2525    * @return procedure id
2526    */
2527   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2528       final long nonce)
2529   throws IOException {
2530     checkInitialized();
2531     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2532     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2533       throw new BypassCoprocessorException();
2534     }
2535     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2536     // Execute the operation synchronously - wait for the operation to complete before continuing.
2537     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2538     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2539     return procId;
2540   }
2541
2542   /**
2543    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2544    * @param nonceGroup Identifier for the source of the request, a client or process.
2545    * @param nonce A unique identifier for this operation from the client or process identified by
2546    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2547    * @return procedure id
2548    */
2549   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2550   throws IOException {
2551     checkInitialized();
2552     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2553       throw new BypassCoprocessorException();
2554     }
2555     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2556     // Execute the operation synchronously - wait for the operation to complete before continuing.
2557     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2558     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2559     return procId;
2560   }
2561
2562   /**
2563    * Get a Namespace
2564    * @param name Name of the Namespace
2565    * @return Namespace descriptor for <code>name</code>
2566    */
2567   NamespaceDescriptor getNamespace(String name) throws IOException {
2568     checkInitialized();
2569     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2570     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2571     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2572     return nsd;
2573   }
2574
2575   /**
2576    * Get all Namespaces
2577    * @return All Namespace descriptors
2578    */
2579   List<NamespaceDescriptor> getNamespaces() throws IOException {
2580     checkInitialized();
2581     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2582     boolean bypass = false;
2583     if (cpHost != null) {
2584       bypass = cpHost.preListNamespaceDescriptors(nsds);
2585     }
2586     if (!bypass) {
2587       nsds.addAll(this.clusterSchemaService.getNamespaces());
2588       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2589     }
2590     return nsds;
2591   }
2592
2593   @Override
2594   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2595     checkInitialized();
2596     return listTableNames(name, null, true);
2597   }
2598
2599   @Override
2600   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2601     checkInitialized();
2602     return listTableDescriptors(name, null, null, true);
2603   }
2604
2605   @Override
2606   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2607       throws IOException {
2608     if (cpHost != null) {
2609       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2610     }
2611
2612     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2613
2614     if (cpHost != null) {
2615       cpHost.postAbortProcedure();
2616     }
2617
2618     return result;
2619   }
2620
2621   @Override
2622   public List<ProcedureInfo> listProcedures() throws IOException {
2623     if (cpHost != null) {
2624       cpHost.preListProcedures();
2625     }
2626
2627     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2628
2629     if (cpHost != null) {
2630       cpHost.postListProcedures(procInfoList);
2631     }
2632
2633     return procInfoList;
2634   }
2635
2636   /**
2637    * Returns the list of table descriptors that match the specified request
2638    * @param namespace the namespace to query, or null if querying for all
2639    * @param regex The regular expression to match against, or null if querying for all
2640    * @param tableNameList the list of table names, or null if querying for all
2641    * @param includeSysTables False to match only against userspace tables
2642    * @return the list of table descriptors
2643    */
2644   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2645       final List<TableName> tableNameList, final boolean includeSysTables)
2646   throws IOException {
2647     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2648     boolean bypass = cpHost != null?
2649         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2650     if (!bypass) {
2651       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2652       if (cpHost != null) {
2653         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2654       }
2655     }
2656     return htds;
2657   }
2658
2659   /**
2660    * Returns the list of table names that match the specified request
2661    * @param regex The regular expression to match against, or null if querying for all
2662    * @param namespace the namespace to query, or null if querying for all
2663    * @param includeSysTables False to match only against userspace tables
2664    * @return the list of table names
2665    */
2666   public List<TableName> listTableNames(final String namespace, final String regex,
2667       final boolean includeSysTables) throws IOException {
2668     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2669     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2670     if (!bypass) {
2671       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2672       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2673     }
2674     List<TableName> result = new ArrayList<TableName>(htds.size());
2675     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2676     return result;
2677   }
2678
2679   /**
2680    * @return list of table table descriptors after filtering by regex and whether to include system
2681    *    tables, etc.
2682    * @throws IOException
2683    */
2684   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2685       final String namespace, final String regex, final List<TableName> tableNameList,
2686       final boolean includeSysTables)
2687   throws IOException {
2688     if (tableNameList == null || tableNameList.size() == 0) {
2689       // request for all TableDescriptors
2690       Collection<HTableDescriptor> allHtds;
2691       if (namespace != null && namespace.length() > 0) {
2692         // Do a check on the namespace existence. Will fail if does not exist.
2693         this.clusterSchemaService.getNamespace(namespace);
2694         allHtds = tableDescriptors.getByNamespace(namespace).values();
2695       } else {
2696         allHtds = tableDescriptors.getAll().values();
2697       }
2698       for (HTableDescriptor desc: allHtds) {
2699         if (tableStateManager.isTablePresent(desc.getTableName())
2700             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2701           htds.add(desc);
2702         }
2703       }
2704     } else {
2705       for (TableName s: tableNameList) {
2706         if (tableStateManager.isTablePresent(s)) {
2707           HTableDescriptor desc = tableDescriptors.get(s);
2708           if (desc != null) {
2709             htds.add(desc);
2710           }
2711         }
2712       }
2713     }
2714
2715     // Retains only those matched by regular expression.
2716     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2717     return htds;
2718   }
2719
2720   /**
2721    * Removes the table descriptors that don't match the pattern.
2722    * @param descriptors list of table descriptors to filter
2723    * @param pattern the regex to use
2724    */
2725   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2726       final Pattern pattern) {
2727     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2728     Iterator<HTableDescriptor> itr = descriptors.iterator();
2729     while (itr.hasNext()) {
2730       HTableDescriptor htd = itr.next();
2731       String tableName = htd.getTableName().getNameAsString();
2732       boolean matched = pattern.matcher(tableName).matches();
2733       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2734         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2735       }
2736       if (!matched) {
2737         itr.remove();
2738       }
2739     }
2740   }
2741
2742   @Override
2743   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2744     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2745   }
2746
2747   @Override
2748   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2749     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2750   }
2751
2752   /**
2753    * Gets the mob file compaction state for a specific table.
2754    * Whether all the mob files are selected is known during the compaction execution, but
2755    * the statistic is done just before compaction starts, it is hard to know the compaction
2756    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2757    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2758    * @param tableName The current table name.
2759    * @return If a given table is in mob file compaction now.
2760    */
2761   public CompactionState getMobCompactionState(TableName tableName) {
2762     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2763     if (compactionsCount != null && compactionsCount.get() != 0) {
2764       return CompactionState.MAJOR_AND_MINOR;
2765     }
2766     return CompactionState.NONE;
2767   }
2768
2769   public void reportMobCompactionStart(TableName tableName) throws IOException {
2770     IdLock.Entry lockEntry = null;
2771     try {
2772       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2773       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2774       if (compactionsCount == null) {
2775         compactionsCount = new AtomicInteger(0);
2776         mobCompactionStates.put(tableName, compactionsCount);
2777       }
2778       compactionsCount.incrementAndGet();
2779     } finally {
2780       if (lockEntry != null) {
2781         mobCompactionLock.releaseLockEntry(lockEntry);
2782       }
2783     }
2784   }
2785
2786   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2787     IdLock.Entry lockEntry = null;
2788     try {
2789       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2790       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2791       if (compactionsCount != null) {
2792         int count = compactionsCount.decrementAndGet();
2793         // remove the entry if the count is 0.
2794         if (count == 0) {
2795           mobCompactionStates.remove(tableName);
2796         }
2797       }
2798     } finally {
2799       if (lockEntry != null) {
2800         mobCompactionLock.releaseLockEntry(lockEntry);
2801       }
2802     }
2803   }
2804
2805   /**
2806    * Requests mob compaction.
2807    * @param tableName The table the compact.
2808    * @param columns The compacted columns.
2809    * @param allFiles Whether add all mob files into the compaction.
2810    */
2811   public void requestMobCompaction(TableName tableName,
2812     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2813     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2814       tableLockManager, allFiles);
2815   }
2816
2817   /**
2818    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2819    * false is returned.
2820    *
2821    * @return The state of the load balancer, or false if the load balancer isn't defined.
2822    */
2823   public boolean isBalancerOn() {
2824     if (null == loadBalancerTracker || isInMaintenanceMode()) {
2825       return false;
2826     }
2827     return loadBalancerTracker.isBalancerOn();
2828   }
2829
2830   /**
2831    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2832    * false is returned.
2833    */
2834   public boolean isNormalizerOn() {
2835     return (null == regionNormalizerTracker || isInMaintenanceMode()) ?
2836         false: regionNormalizerTracker.isNormalizerOn();
2837   }
2838
2839   /**
2840    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
2841    * false is returned. If switchType is illegal, false will return.
2842    * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
2843    * @return The state of the switch
2844    */
2845   public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
2846     if (null == splitOrMergeTracker || isInMaintenanceMode()) {
2847       return false;
2848     }
2849     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
2850   }
2851
2852   /**
2853    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2854    *
2855    * @return The name of the {@link LoadBalancer} in use.
2856    */
2857   public String getLoadBalancerClassName() {
2858     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2859         .getDefaultLoadBalancerClass().getName());
2860   }
2861
2862   /**
2863    * @return RegionNormalizerTracker instance
2864    */
2865   public RegionNormalizerTracker getRegionNormalizerTracker() {
2866     return regionNormalizerTracker;
2867   }
2868
2869   public SplitOrMergeTracker getSplitOrMergeTracker() {
2870     return splitOrMergeTracker;
2871   }
2872
2873   @Override
2874   public LoadBalancer getLoadBalancer() {
2875     return balancer;
2876   }
2877 }