View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Descriptors;
25  import com.google.protobuf.Service;
26  
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.lang.reflect.Constructor;
31  import java.lang.reflect.InvocationTargetException;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.UnknownHostException;
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.Collection;
38  import java.util.Collections;
39  import java.util.Comparator;
40  import java.util.HashSet;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Map.Entry;
45  import java.util.Set;
46  import java.util.concurrent.TimeUnit;
47  import java.util.concurrent.atomic.AtomicInteger;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Pattern;
50  
51  import javax.servlet.ServletException;
52  import javax.servlet.http.HttpServlet;
53  import javax.servlet.http.HttpServletRequest;
54  import javax.servlet.http.HttpServletResponse;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.Path;
60  import org.apache.hadoop.hbase.ClusterStatus;
61  import org.apache.hadoop.hbase.CoordinatedStateException;
62  import org.apache.hadoop.hbase.CoordinatedStateManager;
63  import org.apache.hadoop.hbase.DoNotRetryIOException;
64  import org.apache.hadoop.hbase.HBaseIOException;
65  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
66  import org.apache.hadoop.hbase.HColumnDescriptor;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HTableDescriptor;
70  import org.apache.hadoop.hbase.MasterNotRunningException;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NamespaceDescriptor;
73  import org.apache.hadoop.hbase.PleaseHoldException;
74  import org.apache.hadoop.hbase.ProcedureInfo;
75  import org.apache.hadoop.hbase.RegionStateListener;
76  import org.apache.hadoop.hbase.Server;
77  import org.apache.hadoop.hbase.ServerLoad;
78  import org.apache.hadoop.hbase.ServerName;
79  import org.apache.hadoop.hbase.TableDescriptor;
80  import org.apache.hadoop.hbase.TableDescriptors;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.TableNotDisabledException;
83  import org.apache.hadoop.hbase.TableNotFoundException;
84  import org.apache.hadoop.hbase.UnknownRegionException;
85  import org.apache.hadoop.hbase.classification.InterfaceAudience;
86  import org.apache.hadoop.hbase.client.Admin;
87  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
88  import org.apache.hadoop.hbase.client.Result;
89  import org.apache.hadoop.hbase.client.TableState;
90  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
91  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
92  import org.apache.hadoop.hbase.exceptions.DeserializationException;
93  import org.apache.hadoop.hbase.executor.ExecutorType;
94  import org.apache.hadoop.hbase.ipc.RpcServer;
95  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
96  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
97  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
98  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
99  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
100 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
101 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
102 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
103 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
104 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
105 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
106 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
107 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
108 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
109 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
110 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
111 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
112 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
113 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
115 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
116 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
117 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
118 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
119 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
120 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
121 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
122 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
123 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
124 import org.apache.hadoop.hbase.mob.MobConstants;
125 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
126 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
127 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
128 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
129 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
130 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
131 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
133 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
134 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
135 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
136 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
137 import org.apache.hadoop.hbase.regionserver.HRegionServer;
138 import org.apache.hadoop.hbase.regionserver.HStore;
139 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
140 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
141 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
142 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
143 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
144 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
145 import org.apache.hadoop.hbase.replication.regionserver.Replication;
146 import org.apache.hadoop.hbase.security.User;
147 import org.apache.hadoop.hbase.security.UserProvider;
148 import org.apache.hadoop.hbase.util.Addressing;
149 import org.apache.hadoop.hbase.util.Bytes;
150 import org.apache.hadoop.hbase.util.CompressionTest;
151 import org.apache.hadoop.hbase.util.EncryptionTest;
152 import org.apache.hadoop.hbase.util.FSUtils;
153 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
154 import org.apache.hadoop.hbase.util.HasThread;
155 import org.apache.hadoop.hbase.util.IdLock;
156 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
157 import org.apache.hadoop.hbase.util.Pair;
158 import org.apache.hadoop.hbase.util.Threads;
159 import org.apache.hadoop.hbase.util.VersionInfo;
160 import org.apache.hadoop.hbase.util.ZKDataMigrator;
161 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
162 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
163 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
164 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
165 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
166 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
167 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
168 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
169 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
170 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
171 import org.apache.zookeeper.KeeperException;
172 import org.mortbay.jetty.Connector;
173 import org.mortbay.jetty.nio.SelectChannelConnector;
174 import org.mortbay.jetty.servlet.Context;
175 
176 /**
177  * HMaster is the "master server" for HBase. An HBase cluster has one active
178  * master.  If many masters are started, all compete.  Whichever wins goes on to
179  * run the cluster.  All others park themselves in their constructor until
180  * master or cluster shutdown or until the active master loses its lease in
181  * zookeeper.  Thereafter, all running master jostle to take over master role.
182  *
183  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
184  * this case it will tell all regionservers to go down and then wait on them
185  * all reporting in that they are down.  This master will then shut itself down.
186  *
187  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
188  *
189  * @see org.apache.zookeeper.Watcher
190  */
191 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
192 @SuppressWarnings("deprecation")
193 public class HMaster extends HRegionServer implements MasterServices {
194   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
195 
196   /**
197    * Protection against zombie master. Started once Master accepts active responsibility and
198    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
199    */
200   private static class InitializationMonitor extends HasThread {
201     /** The amount of time in milliseconds to sleep before checking initialization status. */
202     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
203     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
204 
205     /**
206      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
207      * true, do nothing otherwise.
208      */
209     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
210     public static final boolean HALT_DEFAULT = false;
211 
212     private final HMaster master;
213     private final long timeout;
214     private final boolean haltOnTimeout;
215 
216     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
217     InitializationMonitor(HMaster master) {
218       super("MasterInitializationMonitor");
219       this.master = master;
220       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
221       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
222       this.setDaemon(true);
223     }
224 
225     @Override
226     public void run() {
227       try {
228         while (!master.isStopped() && master.isActiveMaster()) {
229           Thread.sleep(timeout);
230           if (master.isInitialized()) {
231             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
232           } else {
233             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
234                 + " consider submitting a bug report including a thread dump of this process.");
235             if (haltOnTimeout) {
236               LOG.error("Zombie Master exiting. Thread dump to stdout");
237               Threads.printThreadInfo(System.out, "Zombie HMaster");
238               System.exit(-1);
239             }
240           }
241         }
242       } catch (InterruptedException ie) {
243         LOG.trace("InitMonitor thread interrupted. Existing.");
244       }
245     }
246   }
247 
248   // MASTER is name of the webapp and the attribute name used stuffing this
249   //instance into web context.
250   public static final String MASTER = "master";
251 
252   // Manager and zk listener for master election
253   private final ActiveMasterManager activeMasterManager;
254   // Region server tracker
255   RegionServerTracker regionServerTracker;
256   // Draining region server tracker
257   private DrainingServerTracker drainingServerTracker;
258   // Tracker for load balancer state
259   LoadBalancerTracker loadBalancerTracker;
260 
261   // Tracker for split and merge state
262   SplitOrMergeTracker splitOrMergeTracker;
263 
264   // Tracker for region normalizer state
265   private RegionNormalizerTracker regionNormalizerTracker;
266 
267   private ClusterSchemaService clusterSchemaService;
268 
269   // Metrics for the HMaster
270   final MetricsMaster metricsMaster;
271   // file system manager for the master FS operations
272   private MasterFileSystem fileSystemManager;
273 
274   // server manager to deal with region server info
275   volatile ServerManager serverManager;
276 
277   // manager of assignment nodes in zookeeper
278   AssignmentManager assignmentManager;
279 
280   // buffer for "fatal error" notices from region servers
281   // in the cluster. This is only used for assisting
282   // operations/debugging.
283   MemoryBoundedLogMessageBuffer rsFatals;
284 
285   // flag set after we become the active master (used for testing)
286   private volatile boolean isActiveMaster = false;
287 
288   // flag set after we complete initialization once active,
289   // it is not private since it's used in unit tests
290   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
291 
292   // flag set after master services are started,
293   // initialization may have not completed yet.
294   volatile boolean serviceStarted = false;
295 
296   // flag set after we complete assignMeta.
297   private final ProcedureEvent serverCrashProcessingEnabled =
298     new ProcedureEvent("server crash processing");
299 
300   LoadBalancer balancer;
301   private RegionNormalizer normalizer;
302   private BalancerChore balancerChore;
303   private RegionNormalizerChore normalizerChore;
304   private ClusterStatusChore clusterStatusChore;
305   private ClusterStatusPublisher clusterStatusPublisherChore = null;
306 
307   CatalogJanitor catalogJanitorChore;
308   private LogCleaner logCleaner;
309   private HFileCleaner hfileCleaner;
310   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
311   private MobCompactionChore mobCompactChore;
312   private MasterMobCompactionThread mobCompactThread;
313   // used to synchronize the mobCompactionStates
314   private final IdLock mobCompactionLock = new IdLock();
315   // save the information of mob compactions in tables.
316   // the key is table name, the value is the number of compactions in that table.
317   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
318 
319   MasterCoprocessorHost cpHost;
320 
321   private final boolean preLoadTableDescriptors;
322 
323   // Time stamps for when a hmaster became active
324   private long masterActiveTime;
325 
326   //should we check the compression codec type at master side, default true, HBASE-6370
327   private final boolean masterCheckCompression;
328 
329   //should we check encryption settings at master side, default true
330   private final boolean masterCheckEncryption;
331 
332   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
333 
334   // monitor for snapshot of hbase tables
335   SnapshotManager snapshotManager;
336   // monitor for distributed procedures
337   private MasterProcedureManagerHost mpmHost;
338 
339   // it is assigned after 'initialized' guard set to true, so should be volatile
340   private volatile MasterQuotaManager quotaManager;
341 
342   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
343   private WALProcedureStore procedureStore;
344 
345   // handle table states
346   private TableStateManager tableStateManager;
347 
348   private long splitPlanCount;
349   private long mergePlanCount;
350 
351   /** flag used in test cases in order to simulate RS failures during master initialization */
352   private volatile boolean initializationBeforeMetaAssignment = false;
353 
354   /** jetty server for master to redirect requests to regionserver infoServer */
355   private org.mortbay.jetty.Server masterJettyServer;
356 
357   public static class RedirectServlet extends HttpServlet {
358     private static final long serialVersionUID = 2894774810058302472L;
359     private static int regionServerInfoPort;
360 
361     @Override
362     public void doGet(HttpServletRequest request,
363         HttpServletResponse response) throws ServletException, IOException {
364       String redirectUrl = request.getScheme() + "://"
365         + request.getServerName() + ":" + regionServerInfoPort
366         + request.getRequestURI();
367       response.sendRedirect(redirectUrl);
368     }
369   }
370 
371   /**
372    * Initializes the HMaster. The steps are as follows:
373    * <p>
374    * <ol>
375    * <li>Initialize the local HRegionServer
376    * <li>Start the ActiveMasterManager.
377    * </ol>
378    * <p>
379    * Remaining steps of initialization occur in
380    * #finishActiveMasterInitialization(MonitoredTask) after
381    * the master becomes the active one.
382    */
383   public HMaster(final Configuration conf, CoordinatedStateManager csm)
384       throws IOException, KeeperException {
385     super(conf, csm);
386     this.rsFatals = new MemoryBoundedLogMessageBuffer(
387       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
388 
389     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
390       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
391 
392     // Disable usage of meta replicas in the master
393     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
394 
395     Replication.decorateMasterConfiguration(this.conf);
396 
397     // Hack! Maps DFSClient => Master for logs.  HDFS made this
398     // config param for task trackers, but we can piggyback off of it.
399     if (this.conf.get("mapreduce.task.attempt.id") == null) {
400       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
401     }
402 
403     // should we check the compression codec type at master side, default true, HBASE-6370
404     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
405 
406     // should we check encryption settings at master side, default true
407     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
408 
409     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
410 
411     // preload table descriptor at startup
412     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
413 
414     // Do we publish the status?
415 
416     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
417         HConstants.STATUS_PUBLISHED_DEFAULT);
418     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
419         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
420             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
421             ClusterStatusPublisher.Publisher.class);
422 
423     if (shouldPublish) {
424       if (publisherClass == null) {
425         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
426             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
427             " is not set - not publishing status");
428       } else {
429         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
430         getChoreService().scheduleChore(clusterStatusPublisherChore);
431       }
432     }
433 
434     // Some unit tests don't need a cluster, so no zookeeper at all
435     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
436       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
437       int infoPort = putUpJettyServer();
438       startActiveMasterManager(infoPort);
439     } else {
440       activeMasterManager = null;
441     }
442   }
443 
444   // return the actual infoPort, -1 means disable info server.
445   private int putUpJettyServer() throws IOException {
446     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
447       return -1;
448     }
449     int infoPort = conf.getInt("hbase.master.info.port.orig",
450       HConstants.DEFAULT_MASTER_INFOPORT);
451     // -1 is for disabling info server, so no redirecting
452     if (infoPort < 0 || infoServer == null) {
453       return -1;
454     }
455     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
456     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
457       String msg =
458           "Failed to start redirecting jetty server. Address " + addr
459               + " does not belong to this host. Correct configuration parameter: "
460               + "hbase.master.info.bindAddress";
461       LOG.error(msg);
462       throw new IOException(msg);
463     }
464 
465     RedirectServlet.regionServerInfoPort = infoServer.getPort();
466     if(RedirectServlet.regionServerInfoPort == infoPort) {
467       return infoPort;
468     }
469     masterJettyServer = new org.mortbay.jetty.Server();
470     Connector connector = new SelectChannelConnector();
471     connector.setHost(addr);
472     connector.setPort(infoPort);
473     masterJettyServer.addConnector(connector);
474     masterJettyServer.setStopAtShutdown(true);
475     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
476     context.addServlet(RedirectServlet.class, "/*");
477     try {
478       masterJettyServer.start();
479     } catch (Exception e) {
480       throw new IOException("Failed to start redirecting jetty server", e);
481     }
482     return connector.getLocalPort();
483   }
484 
485   @Override
486   protected TableDescriptors getFsTableDescriptors() throws IOException {
487     return super.getFsTableDescriptors();
488   }
489 
490   /**
491    * For compatibility, if failed with regionserver credentials, try the master one
492    */
493   @Override
494   protected void login(UserProvider user, String host) throws IOException {
495     try {
496       super.login(user, host);
497     } catch (IOException ie) {
498       user.login("hbase.master.keytab.file",
499         "hbase.master.kerberos.principal", host);
500     }
501   }
502 
503   /**
504    * If configured to put regions on active master,
505    * wait till a backup master becomes active.
506    * Otherwise, loop till the server is stopped or aborted.
507    */
508   @Override
509   protected void waitForMasterActive(){
510     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
511     while (!(tablesOnMaster && isActiveMaster)
512         && !isStopped() && !isAborted()) {
513       sleeper.sleep();
514     }
515   }
516 
517   @VisibleForTesting
518   public MasterRpcServices getMasterRpcServices() {
519     return (MasterRpcServices)rpcServices;
520   }
521 
522   public boolean balanceSwitch(final boolean b) throws IOException {
523     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
524   }
525 
526   @Override
527   protected String getProcessName() {
528     return MASTER;
529   }
530 
531   @Override
532   protected boolean canCreateBaseZNode() {
533     return true;
534   }
535 
536   @Override
537   protected boolean canUpdateTableDescriptor() {
538     return true;
539   }
540 
541   @Override
542   protected RSRpcServices createRpcServices() throws IOException {
543     return new MasterRpcServices(this);
544   }
545 
546   @Override
547   protected void configureInfoServer() {
548     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
549     infoServer.setAttribute(MASTER, this);
550     if (BaseLoadBalancer.tablesOnMaster(conf)) {
551       super.configureInfoServer();
552     }
553   }
554 
555   @Override
556   protected Class<? extends HttpServlet> getDumpServlet() {
557     return MasterDumpServlet.class;
558   }
559 
560   /**
561    * Emit the HMaster metrics, such as region in transition metrics.
562    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
563    */
564   @Override
565   protected void doMetrics() {
566     try {
567       if (assignmentManager != null) {
568         assignmentManager.updateRegionsInTransitionMetrics();
569       }
570     } catch (Throwable e) {
571       LOG.error("Couldn't update metrics: " + e.getMessage());
572     }
573   }
574 
575   MetricsMaster getMasterMetrics() {
576     return metricsMaster;
577   }
578 
579   /**
580    * Initialize all ZK based system trackers.
581    */
582   void initializeZKBasedSystemTrackers() throws IOException,
583       InterruptedException, KeeperException, CoordinatedStateException {
584     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
585     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
586     this.normalizer.setMasterServices(this);
587     this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
588     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
589     this.loadBalancerTracker.start();
590 
591     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
592     this.regionNormalizerTracker.start();
593 
594     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
595     this.splitOrMergeTracker.start();
596 
597     this.assignmentManager = new AssignmentManager(this, serverManager,
598       this.balancer, this.service, this.metricsMaster,
599       this.tableLockManager, tableStateManager);
600 
601     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
602     this.regionServerTracker.start();
603 
604     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
605     this.drainingServerTracker.start();
606 
607     // Set the cluster as up.  If new RSs, they'll be waiting on this before
608     // going ahead with their startup.
609     boolean wasUp = this.clusterStatusTracker.isClusterUp();
610     if (!wasUp) this.clusterStatusTracker.setClusterUp();
611 
612     LOG.info("Server active/primary master=" + this.serverName +
613         ", sessionid=0x" +
614         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
615         ", setting cluster-up flag (Was=" + wasUp + ")");
616 
617     // create/initialize the snapshot manager and other procedure managers
618     this.snapshotManager = new SnapshotManager();
619     this.mpmHost = new MasterProcedureManagerHost();
620     this.mpmHost.register(this.snapshotManager);
621     this.mpmHost.register(new MasterFlushTableProcedureManager());
622     this.mpmHost.loadProcedures(conf);
623     this.mpmHost.initialize(this, this.metricsMaster);
624 
625   }
626 
627   /**
628    * Finish initialization of HMaster after becoming the primary master.
629    *
630    * <ol>
631    * <li>Initialize master components - file system manager, server manager,
632    *     assignment manager, region server tracker, etc</li>
633    * <li>Start necessary service threads - balancer, catalog janior,
634    *     executor services, etc</li>
635    * <li>Set cluster as UP in ZooKeeper</li>
636    * <li>Wait for RegionServers to check-in</li>
637    * <li>Split logs and perform data recovery, if necessary</li>
638    * <li>Ensure assignment of meta/namespace regions<li>
639    * <li>Handle either fresh cluster start or master failover</li>
640    * </ol>
641    */
642   private void finishActiveMasterInitialization(MonitoredTask status)
643       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
644 
645     isActiveMaster = true;
646     Thread zombieDetector = new Thread(new InitializationMonitor(this));
647     zombieDetector.start();
648 
649     /*
650      * We are active master now... go initialize components we need to run.
651      * Note, there may be dross in zk from previous runs; it'll get addressed
652      * below after we determine if cluster startup or failover.
653      */
654 
655     status.setStatus("Initializing Master file system");
656 
657     this.masterActiveTime = System.currentTimeMillis();
658     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
659     this.fileSystemManager = new MasterFileSystem(this, this);
660 
661     // enable table descriptors cache
662     this.tableDescriptors.setCacheOn();
663     // set the META's descriptor to the correct replication
664     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
665         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
666     // warm-up HTDs cache on master initialization
667     if (preLoadTableDescriptors) {
668       status.setStatus("Pre-loading table descriptors");
669       this.tableDescriptors.getAll();
670     }
671 
672     // publish cluster ID
673     status.setStatus("Publishing Cluster ID in ZooKeeper");
674     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
675 
676     this.serverManager = createServerManager(this, this);
677 
678     // Invalidate all write locks held previously
679     this.tableLockManager.reapWriteLocks();
680     this.tableStateManager = new TableStateManager(this);
681 
682     status.setStatus("Initializing ZK system trackers");
683     initializeZKBasedSystemTrackers();
684 
685     // This is for backwards compatibility
686     // See HBASE-11393
687     status.setStatus("Update TableCFs node in ZNode");
688     TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
689             conf, this.clusterConnection);
690     tableCFsUpdater.update();
691 
692     // initialize master side coprocessors before we start handling requests
693     status.setStatus("Initializing master coprocessors");
694     this.cpHost = new MasterCoprocessorHost(this, this.conf);
695 
696     // start up all service threads.
697     status.setStatus("Initializing master service threads");
698     startServiceThreads();
699 
700     // Wake up this server to check in
701     sleeper.skipSleepCycle();
702 
703     // Wait for region servers to report in
704     this.serverManager.waitForRegionServers(status);
705     // Check zk for region servers that are up but didn't register
706     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
707       // The isServerOnline check is opportunistic, correctness is handled inside
708       if (!this.serverManager.isServerOnline(sn)
709           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
710         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
711       }
712     }
713 
714     // get a list for previously failed RS which need log splitting work
715     // we recover hbase:meta region servers inside master initialization and
716     // handle other failed servers in SSH in order to start up master node ASAP
717     Set<ServerName> previouslyFailedServers =
718       this.fileSystemManager.getFailedServersFromLogFolders();
719 
720     // log splitting for hbase:meta server
721     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
722     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
723       splitMetaLogBeforeAssignment(oldMetaServerLocation);
724       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
725       // may also host user regions
726     }
727     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
728     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
729     // instead of previouslyFailedMetaRSs alone to address the following two situations:
730     // 1) the chained failure situation(recovery failed multiple times in a row).
731     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
732     // same server still has non-meta wals to be replayed so that
733     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
734     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
735     // there is no op for the server.
736     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
737 
738     this.initializationBeforeMetaAssignment = true;
739 
740     // Wait for regionserver to finish initialization.
741     if (BaseLoadBalancer.tablesOnMaster(conf)) {
742       waitForServerOnline();
743     }
744 
745     //initialize load balancer
746     this.balancer.setClusterStatus(getClusterStatus());
747     this.balancer.setMasterServices(this);
748     this.balancer.initialize();
749 
750     // Check if master is shutting down because of some issue
751     // in initializing the regionserver or the balancer.
752     if (isStopped()) return;
753 
754     // Make sure meta assigned before proceeding.
755     status.setStatus("Assigning Meta Region");
756     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
757     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
758     // assigned when master is shutting down
759     if (isStopped()) return;
760 
761     // migrating existent table state from zk, so splitters
762     // and recovery process treat states properly.
763     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
764         .queryForTableStates(getZooKeeper()).entrySet()) {
765       LOG.info("Converting state from zk to new states:" + entry);
766       tableStateManager.setTableState(entry.getKey(), entry.getValue());
767     }
768     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
769 
770     status.setStatus("Submitting log splitting work for previously failed region servers");
771     // Master has recovered hbase:meta region server and we put
772     // other failed region servers in a queue to be handled later by SSH
773     for (ServerName tmpServer : previouslyFailedServers) {
774       this.serverManager.processDeadServer(tmpServer, true);
775     }
776 
777     // Fix up assignment manager status
778     status.setStatus("Starting assignment manager");
779     this.assignmentManager.joinCluster();
780 
781     // set cluster status again after user regions are assigned
782     this.balancer.setClusterStatus(getClusterStatus());
783 
784     // Start balancer and meta catalog janitor after meta and regions have been assigned.
785     status.setStatus("Starting balancer and catalog janitor");
786     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
787     getChoreService().scheduleChore(clusterStatusChore);
788     this.balancerChore = new BalancerChore(this);
789     getChoreService().scheduleChore(balancerChore);
790     this.normalizerChore = new RegionNormalizerChore(this);
791     getChoreService().scheduleChore(normalizerChore);
792     this.catalogJanitorChore = new CatalogJanitor(this, this);
793     getChoreService().scheduleChore(catalogJanitorChore);
794 
795     status.setStatus("Starting cluster schema service");
796     initClusterSchemaService();
797 
798     if (this.cpHost != null) {
799       try {
800         this.cpHost.preMasterInitialization();
801       } catch (IOException e) {
802         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
803       }
804     }
805 
806     status.markComplete("Initialization successful");
807     LOG.info("Master has completed initialization");
808     configurationManager.registerObserver(this.balancer);
809 
810     // Set master as 'initialized'.
811     setInitialized(true);
812 
813     // assign the meta replicas
814     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
815     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
816            HConstants.DEFAULT_META_REPLICA_NUM);
817     for (int i = 1; i < numReplicas; i++) {
818       assignMeta(status, EMPTY_SET, i);
819     }
820     unassignExcessMetaReplica(zooKeeper, numReplicas);
821 
822     status.setStatus("Starting quota manager");
823     initQuotaManager();
824 
825     // clear the dead servers with same host name and port of online server because we are not
826     // removing dead server with same hostname and port of rs which is trying to check in before
827     // master initialization. See HBASE-5916.
828     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
829 
830     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
831     status.setStatus("Checking ZNode ACLs");
832     zooKeeper.checkAndSetZNodeAcls();
833 
834     status.setStatus("Calling postStartMaster coprocessors");
835 
836     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
837     getChoreService().scheduleChore(expiredMobFileCleanerChore);
838 
839     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
840       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
841     if (mobCompactionPeriod > 0) {
842       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
843       getChoreService().scheduleChore(mobCompactChore);
844     } else {
845       LOG
846         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
847     }
848     this.mobCompactThread = new MasterMobCompactionThread(this);
849 
850     if (this.cpHost != null) {
851       // don't let cp initialization errors kill the master
852       try {
853         this.cpHost.postStartMaster();
854       } catch (IOException ioe) {
855         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
856       }
857     }
858 
859     zombieDetector.interrupt();
860   }
861   /**
862    * Create a {@link ServerManager} instance.
863    */
864   ServerManager createServerManager(final Server master,
865       final MasterServices services)
866   throws IOException {
867     // We put this out here in a method so can do a Mockito.spy and stub it out
868     // w/ a mocked up ServerManager.
869     setupClusterConnection();
870     return new ServerManager(master, services);
871   }
872 
873   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
874     // unassign the unneeded replicas (for e.g., if the previous master was configured
875     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
876     try {
877       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
878       for (String metaReplicaZnode : metaReplicaZnodes) {
879         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
880         if (replicaId >= numMetaReplicasConfigured) {
881           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
882           LOG.info("Closing excess replica of meta region " + r.getRegion());
883           // send a close and wait for a max of 30 seconds
884           ServerManager.closeRegionSilentlyAndWait(getClusterConnection(), r.getServerName(),
885               r.getRegion(), 30000);
886           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
887         }
888       }
889     } catch (Exception ex) {
890       // ignore the exception since we don't want the master to be wedged due to potential
891       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
892       LOG.warn("Ignoring exception " + ex);
893     }
894   }
895 
896   /**
897    * Check <code>hbase:meta</code> is assigned. If not, assign it.
898    */
899   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
900       throws InterruptedException, IOException, KeeperException {
901     // Work on meta region
902     int assigned = 0;
903     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
904     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
905       status.setStatus("Assigning hbase:meta region");
906     } else {
907       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
908     }
909 
910     // Get current meta state from zk.
911     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
912     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
913         replicaId);
914     RegionStates regionStates = assignmentManager.getRegionStates();
915     regionStates.createRegionState(hri, metaState.getState(),
916         metaState.getServerName(), null);
917 
918     if (!metaState.isOpened() || !metaTableLocator.verifyMetaRegionLocation(
919         this.getClusterConnection(), this.getZooKeeper(), timeout, replicaId)) {
920       ServerName currentMetaServer = metaState.getServerName();
921       if (serverManager.isServerOnline(currentMetaServer)) {
922         if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
923           LOG.info("Meta was in transition on " + currentMetaServer);
924         } else {
925           LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
926                     currentMetaServer);
927         }
928         assignmentManager.processRegionsInTransition(Arrays.asList(metaState));
929       } else {
930         if (currentMetaServer != null) {
931           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
932             splitMetaLogBeforeAssignment(currentMetaServer);
933             regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
934             previouslyFailedMetaRSs.add(currentMetaServer);
935           }
936         }
937         LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
938             " it was on " + currentMetaServer);
939         assignmentManager.assignMeta(hri);
940       }
941       assigned++;
942     }
943 
944     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID)
945       getTableStateManager().setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
946     // TODO: should we prevent from using state manager before meta was initialized?
947     // tableStateManager.start();
948 
949     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
950         && (!previouslyFailedMetaRSs.isEmpty())) {
951       // replay WAL edits mode need new hbase:meta RS is assigned firstly
952       status.setStatus("replaying log for Meta Region");
953       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
954     }
955 
956     this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
957     tableStateManager.start();
958 
959     // Make sure a hbase:meta location is set. We need to enable SSH here since
960     // if the meta region server is died at this time, we need it to be re-assigned
961     // by SSH so that system tables can be assigned.
962     // No need to wait for meta is assigned = 0 when meta is just verified.
963     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
964     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
965       + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
966     status.setStatus("META assigned.");
967   }
968 
969   void initClusterSchemaService() throws IOException, InterruptedException {
970     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
971     this.clusterSchemaService.startAndWait();
972     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
973   }
974 
975   void initQuotaManager() throws IOException {
976     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
977     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
978     quotaManager.start();
979     this.quotaManager = quotaManager;
980   }
981 
982   boolean isCatalogJanitorEnabled() {
983     return catalogJanitorChore != null ?
984       catalogJanitorChore.getEnabled() : false;
985   }
986 
987   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
988     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
989       // In log replay mode, we mark hbase:meta region as recovering in ZK
990       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
991       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
992       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
993     } else {
994       // In recovered.edits mode: create recovered edits file for hbase:meta server
995       this.fileSystemManager.splitMetaLog(currentMetaServer);
996     }
997   }
998 
999   private void enableCrashedServerProcessing(final boolean waitForMeta)
1000   throws IOException, InterruptedException {
1001     // If crashed server processing is disabled, we enable it and expire those dead but not expired
1002     // servers. This is required so that if meta is assigning to a server which dies after
1003     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
1004     // stuck here waiting forever if waitForMeta is specified.
1005     if (!isServerCrashProcessingEnabled()) {
1006       setServerCrashProcessingEnabled(true);
1007       this.serverManager.processQueuedDeadServers();
1008     }
1009 
1010     if (waitForMeta) {
1011       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1012     }
1013   }
1014 
1015   /**
1016    * This function returns a set of region server names under hbase:meta recovering region ZK node
1017    * @return Set of meta server names which were recorded in ZK
1018    */
1019   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1020     Set<ServerName> result = new HashSet<ServerName>();
1021     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1022       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1023     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1024     if (regionFailedServers == null) return result;
1025 
1026     for(String failedServer : regionFailedServers) {
1027       ServerName server = ServerName.parseServerName(failedServer);
1028       result.add(server);
1029     }
1030     return result;
1031   }
1032 
1033   @Override
1034   public TableDescriptors getTableDescriptors() {
1035     return this.tableDescriptors;
1036   }
1037 
1038   @Override
1039   public ServerManager getServerManager() {
1040     return this.serverManager;
1041   }
1042 
1043   @Override
1044   public MasterFileSystem getMasterFileSystem() {
1045     return this.fileSystemManager;
1046   }
1047 
1048   @Override
1049   public TableStateManager getTableStateManager() {
1050     return tableStateManager;
1051   }
1052 
1053   /*
1054    * Start up all services. If any of these threads gets an unhandled exception
1055    * then they just die with a logged message.  This should be fine because
1056    * in general, we do not expect the master to get such unhandled exceptions
1057    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1058    *  need to install an unexpected exception handler.
1059    */
1060   private void startServiceThreads() throws IOException{
1061    // Start the executor service pools
1062    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1063       conf.getInt("hbase.master.executor.openregion.threads", 5));
1064    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1065       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1066    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1067       conf.getInt("hbase.master.executor.serverops.threads", 5));
1068    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1069       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
1070    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1071       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1072 
1073    // We depend on there being only one instance of this executor running
1074    // at a time.  To do concurrency, would need fencing of enable/disable of
1075    // tables.
1076    // Any time changing this maxThreads to > 1, pls see the comment at
1077    // AccessController#postCreateTableHandler
1078    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1079    startProcedureExecutor();
1080 
1081    // Start log cleaner thread
1082    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1083    this.logCleaner =
1084       new LogCleaner(cleanerInterval,
1085          this, conf, getMasterFileSystem().getFileSystem(),
1086          getMasterFileSystem().getOldLogDir());
1087     getChoreService().scheduleChore(logCleaner);
1088 
1089    //start the hfile archive cleaner thread
1090     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1091     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1092         .getFileSystem(), archiveDir);
1093     getChoreService().scheduleChore(hfileCleaner);
1094     serviceStarted = true;
1095     if (LOG.isTraceEnabled()) {
1096       LOG.trace("Started service threads");
1097     }
1098   }
1099 
1100   @Override
1101   protected void sendShutdownInterrupt() {
1102     super.sendShutdownInterrupt();
1103     stopProcedureExecutor();
1104   }
1105 
1106   @Override
1107   protected void stopServiceThreads() {
1108     if (masterJettyServer != null) {
1109       LOG.info("Stopping master jetty server");
1110       try {
1111         masterJettyServer.stop();
1112       } catch (Exception e) {
1113         LOG.error("Failed to stop master jetty server", e);
1114       }
1115     }
1116     super.stopServiceThreads();
1117     stopChores();
1118 
1119     // Wait for all the remaining region servers to report in IFF we were
1120     // running a cluster shutdown AND we were NOT aborting.
1121     if (!isAborted() && this.serverManager != null &&
1122         this.serverManager.isClusterShutdown()) {
1123       this.serverManager.letRegionServersShutdown();
1124     }
1125     if (LOG.isDebugEnabled()) {
1126       LOG.debug("Stopping service threads");
1127     }
1128     // Clean up and close up shop
1129     if (this.logCleaner != null) this.logCleaner.cancel(true);
1130     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1131     if (this.quotaManager != null) this.quotaManager.stop();
1132     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1133     if (this.serverManager != null) this.serverManager.stop();
1134     if (this.assignmentManager != null) this.assignmentManager.stop();
1135     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1136     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1137   }
1138 
1139   private void startProcedureExecutor() throws IOException {
1140     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1141     final Path logDir = new Path(fileSystemManager.getRootDir(),
1142         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1143 
1144     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1145         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1146     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1147     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1148         procEnv.getProcedureQueue());
1149 
1150     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1151         Math.max(Runtime.getRuntime().availableProcessors(),
1152           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1153     final boolean abortOnCorruption = conf.getBoolean(
1154         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1155         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1156     procedureStore.start(numThreads);
1157     procedureExecutor.start(numThreads, abortOnCorruption);
1158   }
1159 
1160   private void stopProcedureExecutor() {
1161     if (procedureExecutor != null) {
1162       procedureExecutor.stop();
1163     }
1164 
1165     if (procedureStore != null) {
1166       procedureStore.stop(isAborted());
1167     }
1168   }
1169 
1170   private void stopChores() {
1171     if (this.expiredMobFileCleanerChore != null) {
1172       this.expiredMobFileCleanerChore.cancel(true);
1173     }
1174     if (this.mobCompactChore != null) {
1175       this.mobCompactChore.cancel(true);
1176     }
1177     if (this.balancerChore != null) {
1178       this.balancerChore.cancel(true);
1179     }
1180     if (this.normalizerChore != null) {
1181       this.normalizerChore.cancel(true);
1182     }
1183     if (this.clusterStatusChore != null) {
1184       this.clusterStatusChore.cancel(true);
1185     }
1186     if (this.catalogJanitorChore != null) {
1187       this.catalogJanitorChore.cancel(true);
1188     }
1189     if (this.clusterStatusPublisherChore != null){
1190       clusterStatusPublisherChore.cancel(true);
1191     }
1192     if (this.mobCompactThread != null) {
1193       this.mobCompactThread.close();
1194     }
1195   }
1196 
1197   /**
1198    * @return Get remote side's InetAddress
1199    */
1200   InetAddress getRemoteInetAddress(final int port,
1201       final long serverStartCode) throws UnknownHostException {
1202     // Do it out here in its own little method so can fake an address when
1203     // mocking up in tests.
1204     InetAddress ia = RpcServer.getRemoteIp();
1205 
1206     // The call could be from the local regionserver,
1207     // in which case, there is no remote address.
1208     if (ia == null && serverStartCode == startcode) {
1209       InetSocketAddress isa = rpcServices.getSocketAddress();
1210       if (isa != null && isa.getPort() == port) {
1211         ia = isa.getAddress();
1212       }
1213     }
1214     return ia;
1215   }
1216 
1217   /**
1218    * @return Maximum time we should run balancer for
1219    */
1220   private int getBalancerCutoffTime() {
1221     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1222     if (balancerCutoffTime == -1) {
1223       // if cutoff time isn't set, defaulting it to period time
1224       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1225       balancerCutoffTime = balancerPeriod;
1226     }
1227     return balancerCutoffTime;
1228   }
1229 
1230   public boolean balance() throws IOException {
1231     return balance(false);
1232   }
1233 
1234   public boolean balance(boolean force) throws IOException {
1235     // if master not initialized, don't run balancer.
1236     if (!isInitialized()) {
1237       LOG.debug("Master has not been initialized, don't run balancer.");
1238       return false;
1239     }
1240     // Do this call outside of synchronized block.
1241     int maximumBalanceTime = getBalancerCutoffTime();
1242     synchronized (this.balancer) {
1243       // If balance not true, don't run balancer.
1244       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1245       // Only allow one balance run at at time.
1246       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1247         Map<String, RegionState> regionsInTransition =
1248           this.assignmentManager.getRegionStates().getRegionsInTransition();
1249         // if hbase:meta region is in transition, result of assignment cannot be recorded
1250         // ignore the force flag in that case
1251         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1252         String prefix = force && !metaInTransition ? "R" : "Not r";
1253         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1254           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1255             abbreviate(regionsInTransition.toString(), 256));
1256         if (!force || metaInTransition) return false;
1257       }
1258       if (this.serverManager.areDeadServersInProgress()) {
1259         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1260           this.serverManager.getDeadServers());
1261         return false;
1262       }
1263 
1264       if (this.cpHost != null) {
1265         try {
1266           if (this.cpHost.preBalance()) {
1267             LOG.debug("Coprocessor bypassing balancer request");
1268             return false;
1269           }
1270         } catch (IOException ioe) {
1271           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1272           return false;
1273         }
1274       }
1275 
1276       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1277         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1278 
1279       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1280 
1281       //Give the balancer the current cluster state.
1282       this.balancer.setClusterStatus(getClusterStatus());
1283       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1284         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1285         if (partialPlans != null) plans.addAll(partialPlans);
1286       }
1287 
1288       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1289       int rpCount = 0;  // number of RegionPlans balanced so far
1290       long totalRegPlanExecTime = 0;
1291       if (plans != null && !plans.isEmpty()) {
1292         for (RegionPlan plan: plans) {
1293           LOG.info("balance " + plan);
1294           long balStartTime = System.currentTimeMillis();
1295           //TODO: bulk assign
1296           this.assignmentManager.balance(plan);
1297           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1298           rpCount++;
1299           if (rpCount < plans.size() &&
1300               // if performing next balance exceeds cutoff time, exit the loop
1301               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1302             //TODO: After balance, there should not be a cutoff time (keeping it as
1303             // a security net for now)
1304             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1305               maximumBalanceTime);
1306             break;
1307           }
1308         }
1309       }
1310       if (this.cpHost != null) {
1311         try {
1312           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1313         } catch (IOException ioe) {
1314           // balancing already succeeded so don't change the result
1315           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1316         }
1317       }
1318     }
1319     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1320     // Return true indicating a success.
1321     return true;
1322   }
1323 
1324   @VisibleForTesting
1325   public RegionNormalizer getRegionNormalizer() {
1326     return this.normalizer;
1327   }
1328 
1329   /**
1330    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1331    *
1332    * @return true if normalization step was performed successfully, false otherwise
1333    *    (specifically, if HMaster hasn't been initialized properly or normalization
1334    *    is globally disabled)
1335    */
1336   public boolean normalizeRegions() throws IOException {
1337     if (!isInitialized()) {
1338       LOG.debug("Master has not been initialized, don't run region normalizer.");
1339       return false;
1340     }
1341 
1342     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1343       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1344       return false;
1345     }
1346 
1347     synchronized (this.normalizer) {
1348       // Don't run the normalizer concurrently
1349       List<TableName> allEnabledTables = new ArrayList<>(
1350         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1351 
1352       Collections.shuffle(allEnabledTables);
1353 
1354       for (TableName table : allEnabledTables) {
1355         TableDescriptor tblDesc = getTableDescriptors().getDescriptor(table);
1356         if (table.isSystemTable() || (tblDesc != null &&
1357             tblDesc.getHTableDescriptor() != null &&
1358             !tblDesc.getHTableDescriptor().isNormalizationEnabled())) {
1359           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1360               + " table or doesn't have auto normalization turned on");
1361           continue;
1362         }
1363         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1364         if (plans != null) {
1365           for (NormalizationPlan plan : plans) {
1366             plan.execute(clusterConnection.getAdmin());
1367             if (plan.getType() == PlanType.SPLIT) {
1368               splitPlanCount++;
1369             } else if (plan.getType() == PlanType.MERGE) {
1370               mergePlanCount++;
1371             }
1372           }
1373         }
1374       }
1375     }
1376     // If Region did not generate any plans, it means the cluster is already balanced.
1377     // Return true indicating a success.
1378     return true;
1379   }
1380 
1381   /**
1382    * @return Client info for use as prefix on an audit log string; who did an action
1383    */
1384   String getClientIdAuditPrefix() {
1385     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1386   }
1387 
1388   /**
1389    * Switch for the background CatalogJanitor thread.
1390    * Used for testing.  The thread will continue to run.  It will just be a noop
1391    * if disabled.
1392    * @param b If false, the catalog janitor won't do anything.
1393    */
1394   public void setCatalogJanitorEnabled(final boolean b) {
1395     this.catalogJanitorChore.setEnabled(b);
1396   }
1397 
1398   @Override
1399   public void dispatchMergingRegions(final HRegionInfo region_a,
1400       final HRegionInfo region_b, final boolean forcible, final User user) throws IOException {
1401     checkInitialized();
1402     this.service.submit(new DispatchMergingRegionHandler(this,
1403       this.catalogJanitorChore, region_a, region_b, forcible, user));
1404   }
1405 
1406   void move(final byte[] encodedRegionName,
1407       final byte[] destServerName) throws HBaseIOException {
1408     RegionState regionState = assignmentManager.getRegionStates().
1409       getRegionState(Bytes.toString(encodedRegionName));
1410 
1411     HRegionInfo hri;
1412     if (regionState != null) {
1413       hri = regionState.getRegion();
1414     } else {
1415       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1416     }
1417 
1418     ServerName dest;
1419     if (destServerName == null || destServerName.length == 0) {
1420       LOG.info("Passed destination servername is null/empty so " +
1421         "choosing a server at random");
1422       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1423         regionState.getServerName());
1424       dest = balancer.randomAssignment(hri, destServers);
1425       if (dest == null) {
1426         LOG.debug("Unable to determine a plan to assign " + hri);
1427         return;
1428       }
1429     } else {
1430       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1431       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1432       if (dest == null) {
1433         LOG.debug("Unable to determine a plan to assign " + hri);
1434         return;
1435       }
1436       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1437           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1438         // To avoid unnecessary region moving later by balancer. Don't put user
1439         // regions on master. Regions on master could be put on other region
1440         // server intentionally by test however.
1441         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1442           + " to avoid unnecessary region moving later by load balancer,"
1443           + " because it should not be on master");
1444         return;
1445       }
1446     }
1447 
1448     if (dest.equals(regionState.getServerName())) {
1449       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1450         + " because region already assigned to the same server " + dest + ".");
1451       return;
1452     }
1453 
1454     // Now we can do the move
1455     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1456 
1457     try {
1458       checkInitialized();
1459       if (this.cpHost != null) {
1460         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1461           return;
1462         }
1463       }
1464       // warmup the region on the destination before initiating the move. this call
1465       // is synchronous and takes some time. doing it before the source region gets
1466       // closed
1467       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1468 
1469       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1470       this.assignmentManager.balance(rp);
1471       if (this.cpHost != null) {
1472         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1473       }
1474     } catch (IOException ioe) {
1475       if (ioe instanceof HBaseIOException) {
1476         throw (HBaseIOException)ioe;
1477       }
1478       throw new HBaseIOException(ioe);
1479     }
1480   }
1481 
1482   @Override
1483   public long createTable(
1484       final HTableDescriptor hTableDescriptor,
1485       final byte [][] splitKeys,
1486       final long nonceGroup,
1487       final long nonce) throws IOException {
1488     if (isStopped()) {
1489       throw new MasterNotRunningException();
1490     }
1491     checkInitialized();
1492     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1493     this.clusterSchemaService.getNamespace(namespace);
1494 
1495     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1496     checkInitialized();
1497     sanityCheckTableDescriptor(hTableDescriptor);
1498     if (cpHost != null) {
1499       cpHost.preCreateTable(hTableDescriptor, newRegions);
1500     }
1501     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1502 
1503     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1504     //       TableExistsException by saying if the schema is the same or not.
1505     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1506     long procId = this.procedureExecutor.submitProcedure(
1507       new CreateTableProcedure(
1508         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1509       nonceGroup,
1510       nonce);
1511     latch.await();
1512 
1513     if (cpHost != null) {
1514       cpHost.postCreateTable(hTableDescriptor, newRegions);
1515     }
1516 
1517     return procId;
1518   }
1519 
1520   /**
1521    * Checks whether the table conforms to some sane limits, and configured
1522    * values (compression, etc) work. Throws an exception if something is wrong.
1523    * @throws IOException
1524    */
1525   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1526     final String CONF_KEY = "hbase.table.sanity.checks";
1527     boolean logWarn = false;
1528     if (!conf.getBoolean(CONF_KEY, true)) {
1529       logWarn = true;
1530     }
1531     String tableVal = htd.getConfigurationValue(CONF_KEY);
1532     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1533       logWarn = true;
1534     }
1535 
1536     // check max file size
1537     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1538     long maxFileSize = htd.getMaxFileSize();
1539     if (maxFileSize < 0) {
1540       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1541     }
1542     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1543       String message = "MAX_FILESIZE for table descriptor or "
1544           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1545           + ") is too small, which might cause over splitting into unmanageable "
1546           + "number of regions.";
1547       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1548     }
1549 
1550     // check flush size
1551     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1552     long flushSize = htd.getMemStoreFlushSize();
1553     if (flushSize < 0) {
1554       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1555     }
1556     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1557       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1558           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1559           + " very frequent flushing.";
1560       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1561     }
1562 
1563     // check that coprocessors and other specified plugin classes can be loaded
1564     try {
1565       checkClassLoading(conf, htd);
1566     } catch (Exception ex) {
1567       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1568     }
1569 
1570     // check compression can be loaded
1571     try {
1572       checkCompression(htd);
1573     } catch (IOException e) {
1574       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1575     }
1576 
1577     // check encryption can be loaded
1578     try {
1579       checkEncryption(conf, htd);
1580     } catch (IOException e) {
1581       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1582     }
1583     // Verify compaction policy
1584     try{
1585       checkCompactionPolicy(conf, htd);
1586     } catch(IOException e){
1587       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1588     }
1589     // check that we have at least 1 CF
1590     if (htd.getColumnFamilies().length == 0) {
1591       String message = "Table should have at least one column family.";
1592       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1593     }
1594 
1595     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1596       if (hcd.getTimeToLive() <= 0) {
1597         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1598         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1599       }
1600 
1601       // check blockSize
1602       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1603         String message = "Block size for column family " + hcd.getNameAsString()
1604             + "  must be between 1K and 16MB.";
1605         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1606       }
1607 
1608       // check versions
1609       if (hcd.getMinVersions() < 0) {
1610         String message = "Min versions for column family " + hcd.getNameAsString()
1611           + "  must be positive.";
1612         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1613       }
1614       // max versions already being checked
1615 
1616       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1617       //  does not throw IllegalArgumentException
1618       // check minVersions <= maxVerions
1619       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1620         String message = "Min versions for column family " + hcd.getNameAsString()
1621             + " must be less than the Max versions.";
1622         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1623       }
1624 
1625       // check replication scope
1626       if (hcd.getScope() < 0) {
1627         String message = "Replication scope for column family "
1628           + hcd.getNameAsString() + "  must be positive.";
1629         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1630       }
1631 
1632       // check data replication factor, it can be 0(default value) when user has not explicitly
1633       // set the value, in this case we use default replication factor set in the file system.
1634       if (hcd.getDFSReplication() < 0) {
1635         String message = "HFile Replication for column family " + hcd.getNameAsString()
1636             + "  must be greater than zero.";
1637         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1638       }
1639 
1640       // TODO: should we check coprocessors and encryption ?
1641     }
1642   }
1643 
1644   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1645       throws IOException {
1646     // FIFO compaction has some requirements
1647     // Actually FCP ignores periodic major compactions
1648     String className =
1649         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1650     if (className == null) {
1651       className =
1652           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1653             ExploringCompactionPolicy.class.getName());
1654     }
1655 
1656     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1657     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1658     if (sv != null) {
1659       blockingFileCount = Integer.parseInt(sv);
1660     } else {
1661       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1662     }
1663 
1664     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1665       String compactionPolicy =
1666           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1667       if (compactionPolicy == null) {
1668         compactionPolicy = className;
1669       }
1670       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1671         continue;
1672       }
1673       // FIFOCompaction
1674       String message = null;
1675 
1676       // 1. Check TTL
1677       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1678         message = "Default TTL is not supported for FIFO compaction";
1679         throw new IOException(message);
1680       }
1681 
1682       // 2. Check min versions
1683       if (hcd.getMinVersions() > 0) {
1684         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1685         throw new IOException(message);
1686       }
1687 
1688       // 3. blocking file count
1689       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1690       if (sbfc != null) {
1691         blockingFileCount = Integer.parseInt(sbfc);
1692       }
1693       if (blockingFileCount < 1000) {
1694         message =
1695             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1696                 + " is below recommended minimum of 1000";
1697         throw new IOException(message);
1698       }
1699     }
1700   }
1701 
1702   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1703   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1704       String message, Exception cause) throws IOException {
1705     if (!logWarn) {
1706       throw new DoNotRetryIOException(message + " Set " + confKey +
1707           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1708     }
1709     LOG.warn(message);
1710   }
1711 
1712   private void startActiveMasterManager(int infoPort) throws KeeperException {
1713     String backupZNode = ZKUtil.joinZNode(
1714       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1715     /*
1716     * Add a ZNode for ourselves in the backup master directory since we
1717     * may not become the active master. If so, we want the actual active
1718     * master to know we are backup masters, so that it won't assign
1719     * regions to us if so configured.
1720     *
1721     * If we become the active master later, ActiveMasterManager will delete
1722     * this node explicitly.  If we crash before then, ZooKeeper will delete
1723     * this node for us since it is ephemeral.
1724     */
1725     LOG.info("Adding backup master ZNode " + backupZNode);
1726     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1727         serverName, infoPort)) {
1728       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1729     }
1730 
1731     activeMasterManager.setInfoPort(infoPort);
1732     // Start a thread to try to become the active master, so we won't block here
1733     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1734       @Override
1735       public void run() {
1736         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1737           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1738         // If we're a backup master, stall until a primary to writes his address
1739         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1740           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1741           LOG.debug("HMaster started in backup mode. "
1742             + "Stalling until master znode is written.");
1743           // This will only be a minute or so while the cluster starts up,
1744           // so don't worry about setting watches on the parent znode
1745           while (!activeMasterManager.hasActiveMaster()) {
1746             LOG.debug("Waiting for master address ZNode to be written "
1747               + "(Also watching cluster state node)");
1748             Threads.sleep(timeout);
1749           }
1750         }
1751         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1752         status.setDescription("Master startup");
1753         try {
1754           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1755             finishActiveMasterInitialization(status);
1756           }
1757         } catch (Throwable t) {
1758           status.setStatus("Failed to become active: " + t.getMessage());
1759           LOG.fatal("Failed to become active master", t);
1760           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1761           if (t instanceof NoClassDefFoundError &&
1762             t.getMessage()
1763               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1764             // improved error message for this special case
1765             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1766               + "recompile HBase against Hadoop version "
1767               + org.apache.hadoop.util.VersionInfo.getVersion()
1768               + " or change your hadoop jars to start properly", t);
1769           } else {
1770             abort("Unhandled exception. Starting shutdown.", t);
1771           }
1772         } finally {
1773           status.cleanup();
1774         }
1775       }
1776     }, getServerName().toShortString() + ".activeMasterManager"));
1777   }
1778 
1779   private void checkCompression(final HTableDescriptor htd)
1780   throws IOException {
1781     if (!this.masterCheckCompression) return;
1782     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1783       checkCompression(hcd);
1784     }
1785   }
1786 
1787   private void checkCompression(final HColumnDescriptor hcd)
1788   throws IOException {
1789     if (!this.masterCheckCompression) return;
1790     CompressionTest.testCompression(hcd.getCompressionType());
1791     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1792   }
1793 
1794   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1795   throws IOException {
1796     if (!this.masterCheckEncryption) return;
1797     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1798       checkEncryption(conf, hcd);
1799     }
1800   }
1801 
1802   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1803   throws IOException {
1804     if (!this.masterCheckEncryption) return;
1805     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1806   }
1807 
1808   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1809   throws IOException {
1810     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1811     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1812   }
1813 
1814   private static boolean isCatalogTable(final TableName tableName) {
1815     return tableName.equals(TableName.META_TABLE_NAME);
1816   }
1817 
1818   @Override
1819   public long deleteTable(
1820       final TableName tableName,
1821       final long nonceGroup,
1822       final long nonce) throws IOException {
1823     checkInitialized();
1824     if (cpHost != null) {
1825       cpHost.preDeleteTable(tableName);
1826     }
1827     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1828 
1829     // TODO: We can handle/merge duplicate request
1830     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1831     long procId = this.procedureExecutor.submitProcedure(
1832       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1833       nonceGroup,
1834       nonce);
1835     latch.await();
1836 
1837     if (cpHost != null) {
1838       cpHost.postDeleteTable(tableName);
1839     }
1840 
1841     return procId;
1842   }
1843 
1844   @Override
1845   public long truncateTable(
1846       final TableName tableName,
1847       final boolean preserveSplits,
1848       final long nonceGroup,
1849       final long nonce) throws IOException {
1850     checkInitialized();
1851     if (cpHost != null) {
1852       cpHost.preTruncateTable(tableName);
1853     }
1854     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1855 
1856     long procId = this.procedureExecutor.submitProcedure(
1857       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1858       nonceGroup,
1859       nonce);
1860     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1861 
1862     if (cpHost != null) {
1863       cpHost.postTruncateTable(tableName);
1864     }
1865     return procId;
1866   }
1867 
1868   @Override
1869   public long addColumn(
1870       final TableName tableName,
1871       final HColumnDescriptor columnDescriptor,
1872       final long nonceGroup,
1873       final long nonce)
1874       throws IOException {
1875     checkInitialized();
1876     checkCompression(columnDescriptor);
1877     checkEncryption(conf, columnDescriptor);
1878     if (cpHost != null) {
1879       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1880         return -1;
1881       }
1882     }
1883     // Execute the operation synchronously - wait for the operation to complete before continuing.
1884     long procId = this.procedureExecutor.submitProcedure(
1885       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1886       nonceGroup,
1887       nonce);
1888     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1889     if (cpHost != null) {
1890       cpHost.postAddColumn(tableName, columnDescriptor);
1891     }
1892     return procId;
1893   }
1894 
1895   @Override
1896   public long modifyColumn(
1897       final TableName tableName,
1898       final HColumnDescriptor descriptor,
1899       final long nonceGroup,
1900       final long nonce)
1901       throws IOException {
1902     checkInitialized();
1903     checkCompression(descriptor);
1904     checkEncryption(conf, descriptor);
1905     if (cpHost != null) {
1906       if (cpHost.preModifyColumn(tableName, descriptor)) {
1907         return -1;
1908       }
1909     }
1910     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1911 
1912     // Execute the operation synchronously - wait for the operation to complete before continuing.
1913     long procId = this.procedureExecutor.submitProcedure(
1914       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1915       nonceGroup,
1916       nonce);
1917     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1918 
1919     if (cpHost != null) {
1920       cpHost.postModifyColumn(tableName, descriptor);
1921     }
1922     return procId;
1923   }
1924 
1925   @Override
1926   public long deleteColumn(
1927       final TableName tableName,
1928       final byte[] columnName,
1929       final long nonceGroup,
1930       final long nonce)
1931       throws IOException {
1932     checkInitialized();
1933     if (cpHost != null) {
1934       if (cpHost.preDeleteColumn(tableName, columnName)) {
1935         return -1;
1936       }
1937     }
1938     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1939 
1940     // Execute the operation synchronously - wait for the operation to complete before continuing.
1941     long procId = this.procedureExecutor.submitProcedure(
1942       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1943       nonceGroup,
1944       nonce);
1945     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1946 
1947     if (cpHost != null) {
1948       cpHost.postDeleteColumn(tableName, columnName);
1949     }
1950     return procId;
1951   }
1952 
1953   @Override
1954   public long enableTable(
1955       final TableName tableName,
1956       final long nonceGroup,
1957       final long nonce) throws IOException {
1958     checkInitialized();
1959     if (cpHost != null) {
1960       cpHost.preEnableTable(tableName);
1961     }
1962     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1963 
1964     // Execute the operation asynchronously - client will check the progress of the operation
1965     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1966     long procId = this.procedureExecutor.submitProcedure(
1967       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1968       nonceGroup,
1969       nonce);
1970     // Before returning to client, we want to make sure that the table is prepared to be
1971     // enabled (the table is locked and the table state is set).
1972     //
1973     // Note: if the procedure throws exception, we will catch it and rethrow.
1974     prepareLatch.await();
1975 
1976     if (cpHost != null) {
1977       cpHost.postEnableTable(tableName);
1978     }
1979 
1980     return procId;
1981   }
1982 
1983   @Override
1984   public long disableTable(
1985       final TableName tableName,
1986       final long nonceGroup,
1987       final long nonce) throws IOException {
1988     checkInitialized();
1989     if (cpHost != null) {
1990       cpHost.preDisableTable(tableName);
1991     }
1992     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1993 
1994     // Execute the operation asynchronously - client will check the progress of the operation
1995     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1996     // Execute the operation asynchronously - client will check the progress of the operation
1997     long procId = this.procedureExecutor.submitProcedure(
1998       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1999       nonceGroup,
2000       nonce);
2001     // Before returning to client, we want to make sure that the table is prepared to be
2002     // enabled (the table is locked and the table state is set).
2003     //
2004     // Note: if the procedure throws exception, we will catch it and rethrow.
2005     prepareLatch.await();
2006 
2007     if (cpHost != null) {
2008       cpHost.postDisableTable(tableName);
2009     }
2010 
2011     return procId;
2012   }
2013 
2014   /**
2015    * Return the region and current deployment for the region containing
2016    * the given row. If the region cannot be found, returns null. If it
2017    * is found, but not currently deployed, the second element of the pair
2018    * may be null.
2019    */
2020   @VisibleForTesting // Used by TestMaster.
2021   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2022       final TableName tableName, final byte [] rowKey)
2023   throws IOException {
2024     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2025       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2026 
2027     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2028         @Override
2029         public boolean visit(Result data) throws IOException {
2030           if (data == null || data.size() <= 0) {
2031             return true;
2032           }
2033           Pair<HRegionInfo, ServerName> pair =
2034               new Pair(MetaTableAccessor.getHRegionInfo(data),
2035                   MetaTableAccessor.getServerName(data,0));
2036           if (pair == null) {
2037             return false;
2038           }
2039           if (!pair.getFirst().getTable().equals(tableName)) {
2040             return false;
2041           }
2042           result.set(pair);
2043           return true;
2044         }
2045     };
2046 
2047     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2048     return result.get();
2049   }
2050 
2051   @Override
2052   public long modifyTable(
2053       final TableName tableName,
2054       final HTableDescriptor descriptor,
2055       final long nonceGroup,
2056       final long nonce)
2057       throws IOException {
2058     checkInitialized();
2059     sanityCheckTableDescriptor(descriptor);
2060     if (cpHost != null) {
2061       cpHost.preModifyTable(tableName, descriptor);
2062     }
2063 
2064     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2065 
2066     // Execute the operation synchronously - wait for the operation completes before continuing.
2067     long procId = this.procedureExecutor.submitProcedure(
2068       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2069       nonceGroup,
2070       nonce);
2071 
2072     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2073 
2074     if (cpHost != null) {
2075       cpHost.postModifyTable(tableName, descriptor);
2076     }
2077 
2078     return procId;
2079   }
2080 
2081   @Override
2082   public void checkTableModifiable(final TableName tableName)
2083       throws IOException, TableNotFoundException, TableNotDisabledException {
2084     if (isCatalogTable(tableName)) {
2085       throw new IOException("Can't modify catalog tables");
2086     }
2087     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2088       throw new TableNotFoundException(tableName);
2089     }
2090     if (!getAssignmentManager().getTableStateManager().
2091         isTableState(tableName, TableState.State.DISABLED)) {
2092       throw new TableNotDisabledException(tableName);
2093     }
2094   }
2095 
2096   /**
2097    * @return cluster status
2098    */
2099   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2100     // Build Set of backup masters from ZK nodes
2101     List<String> backupMasterStrings;
2102     try {
2103       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2104         this.zooKeeper.backupMasterAddressesZNode);
2105     } catch (KeeperException e) {
2106       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2107       backupMasterStrings = null;
2108     }
2109 
2110     List<ServerName> backupMasters = null;
2111     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2112       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2113       for (String s: backupMasterStrings) {
2114         try {
2115           byte [] bytes;
2116           try {
2117             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2118                 this.zooKeeper.backupMasterAddressesZNode, s));
2119           } catch (InterruptedException e) {
2120             throw new InterruptedIOException();
2121           }
2122           if (bytes != null) {
2123             ServerName sn;
2124             try {
2125               sn = ServerName.parseFrom(bytes);
2126             } catch (DeserializationException e) {
2127               LOG.warn("Failed parse, skipping registering backup server", e);
2128               continue;
2129             }
2130             backupMasters.add(sn);
2131           }
2132         } catch (KeeperException e) {
2133           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2134                    "backup servers"), e);
2135         }
2136       }
2137       Collections.sort(backupMasters, new Comparator<ServerName>() {
2138         @Override
2139         public int compare(ServerName s1, ServerName s2) {
2140           return s1.getServerName().compareTo(s2.getServerName());
2141         }});
2142     }
2143 
2144     String clusterId = fileSystemManager != null ?
2145       fileSystemManager.getClusterId().toString() : null;
2146     Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2147       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2148     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2149     boolean balancerOn = loadBalancerTracker != null ?
2150       loadBalancerTracker.isBalancerOn() : false;
2151     Map<ServerName, ServerLoad> onlineServers = null;
2152     Set<ServerName> deadServers = null;
2153     if (serverManager != null) {
2154       deadServers = serverManager.getDeadServers().copyServerNames();
2155       onlineServers = serverManager.getOnlineServers();
2156     }
2157     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2158       onlineServers, deadServers, serverName, backupMasters,
2159       regionsInTransition, coprocessors, balancerOn);
2160   }
2161 
2162   /**
2163    * The set of loaded coprocessors is stored in a static set. Since it's
2164    * statically allocated, it does not require that HMaster's cpHost be
2165    * initialized prior to accessing it.
2166    * @return a String representation of the set of names of the loaded coprocessors.
2167    */
2168   public static String getLoadedCoprocessors() {
2169     return CoprocessorHost.getLoadedCoprocessors().toString();
2170   }
2171 
2172   /**
2173    * @return timestamp in millis when HMaster was started.
2174    */
2175   public long getMasterStartTime() {
2176     return startcode;
2177   }
2178 
2179   /**
2180    * @return timestamp in millis when HMaster became the active master.
2181    */
2182   public long getMasterActiveTime() {
2183     return masterActiveTime;
2184   }
2185 
2186   public int getNumWALFiles() {
2187     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2188   }
2189 
2190   public WALProcedureStore getWalProcedureStore() {
2191     return procedureStore;
2192   }
2193 
2194   public int getRegionServerInfoPort(final ServerName sn) {
2195     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2196     if (info == null || info.getInfoPort() == 0) {
2197       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2198         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2199     }
2200     return info.getInfoPort();
2201   }
2202 
2203   public String getRegionServerVersion(final ServerName sn) {
2204     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2205     if (info != null && info.hasVersionInfo()) {
2206       return info.getVersionInfo().getVersion();
2207     }
2208     return "Unknown";
2209   }
2210 
2211   /**
2212    * @return array of coprocessor SimpleNames.
2213    */
2214   public String[] getMasterCoprocessors() {
2215     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2216     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2217   }
2218 
2219   @Override
2220   public void abort(final String msg, final Throwable t) {
2221     if (isAborted() || isStopped()) {
2222       return;
2223     }
2224     if (cpHost != null) {
2225       // HBASE-4014: dump a list of loaded coprocessors.
2226       LOG.fatal("Master server abort: loaded coprocessors are: " +
2227           getLoadedCoprocessors());
2228     }
2229     if (t != null) LOG.fatal(msg, t);
2230     stopMaster();
2231   }
2232 
2233   @Override
2234   public ZooKeeperWatcher getZooKeeper() {
2235     return zooKeeper;
2236   }
2237 
2238   @Override
2239   public MasterCoprocessorHost getMasterCoprocessorHost() {
2240     return cpHost;
2241   }
2242 
2243   @Override
2244   public MasterQuotaManager getMasterQuotaManager() {
2245     return quotaManager;
2246   }
2247 
2248   @Override
2249   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2250     return procedureExecutor;
2251   }
2252 
2253   @Override
2254   public ServerName getServerName() {
2255     return this.serverName;
2256   }
2257 
2258   @Override
2259   public AssignmentManager getAssignmentManager() {
2260     return this.assignmentManager;
2261   }
2262 
2263   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2264     return rsFatals;
2265   }
2266 
2267   public void shutdown() {
2268     if (cpHost != null) {
2269       try {
2270         cpHost.preShutdown();
2271       } catch (IOException ioe) {
2272         LOG.error("Error call master coprocessor preShutdown()", ioe);
2273       }
2274     }
2275 
2276     if (this.serverManager != null) {
2277       this.serverManager.shutdownCluster();
2278     }
2279     if (this.clusterStatusTracker != null){
2280       try {
2281         this.clusterStatusTracker.setClusterDown();
2282       } catch (KeeperException e) {
2283         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2284       }
2285     }
2286   }
2287 
2288   public void stopMaster() {
2289     if (cpHost != null) {
2290       try {
2291         cpHost.preStopMaster();
2292       } catch (IOException ioe) {
2293         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2294       }
2295     }
2296     stop("Stopped by " + Thread.currentThread().getName());
2297   }
2298 
2299   void checkServiceStarted() throws ServerNotRunningYetException {
2300     if (!serviceStarted) {
2301       throw new ServerNotRunningYetException("Server is not running yet");
2302     }
2303   }
2304 
2305   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2306     checkServiceStarted();
2307     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2308   }
2309 
2310   /**
2311    * Report whether this master is currently the active master or not.
2312    * If not active master, we are parked on ZK waiting to become active.
2313    *
2314    * This method is used for testing.
2315    *
2316    * @return true if active master, false if not.
2317    */
2318   public boolean isActiveMaster() {
2319     return isActiveMaster;
2320   }
2321 
2322   /**
2323    * Report whether this master has completed with its initialization and is
2324    * ready.  If ready, the master is also the active master.  A standby master
2325    * is never ready.
2326    *
2327    * This method is used for testing.
2328    *
2329    * @return true if master is ready to go, false if not.
2330    */
2331   @Override
2332   public boolean isInitialized() {
2333     return initialized.isReady();
2334   }
2335 
2336   @VisibleForTesting
2337   public void setInitialized(boolean isInitialized) {
2338     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2339   }
2340 
2341   public ProcedureEvent getInitializedEvent() {
2342     return initialized;
2343   }
2344 
2345   /**
2346    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2347    * of crashed servers.
2348    * @return true if assignMeta has completed;
2349    */
2350   @Override
2351   public boolean isServerCrashProcessingEnabled() {
2352     return serverCrashProcessingEnabled.isReady();
2353   }
2354 
2355   @VisibleForTesting
2356   public void setServerCrashProcessingEnabled(final boolean b) {
2357     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2358   }
2359 
2360   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2361     return serverCrashProcessingEnabled;
2362   }
2363 
2364   /**
2365    * Report whether this master has started initialization and is about to do meta region assignment
2366    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2367    */
2368   public boolean isInitializationStartsMetaRegionAssignment() {
2369     return this.initializationBeforeMetaAssignment;
2370   }
2371 
2372   public void assignRegion(HRegionInfo hri) {
2373     assignmentManager.assign(hri);
2374   }
2375 
2376   /**
2377    * Compute the average load across all region servers.
2378    * Currently, this uses a very naive computation - just uses the number of
2379    * regions being served, ignoring stats about number of requests.
2380    * @return the average load
2381    */
2382   public double getAverageLoad() {
2383     if (this.assignmentManager == null) {
2384       return 0;
2385     }
2386 
2387     RegionStates regionStates = this.assignmentManager.getRegionStates();
2388     if (regionStates == null) {
2389       return 0;
2390     }
2391     return regionStates.getAverageLoad();
2392   }
2393 
2394   /*
2395    * @return the count of region split plans executed
2396    */
2397   public long getSplitPlanCount() {
2398     return splitPlanCount;
2399   }
2400 
2401   /*
2402    * @return the count of region merge plans executed
2403    */
2404   public long getMergePlanCount() {
2405     return mergePlanCount;
2406   }
2407 
2408   @Override
2409   public boolean registerService(Service instance) {
2410     /*
2411      * No stacking of instances is allowed for a single service name
2412      */
2413     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2414     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2415       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2416           " already registered, rejecting request from "+instance
2417       );
2418       return false;
2419     }
2420 
2421     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2422     if (LOG.isDebugEnabled()) {
2423       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2424     }
2425     return true;
2426   }
2427 
2428   /**
2429    * Utility for constructing an instance of the passed HMaster class.
2430    * @param masterClass
2431    * @return HMaster instance.
2432    */
2433   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2434       final Configuration conf, final CoordinatedStateManager cp)  {
2435     try {
2436       Constructor<? extends HMaster> c =
2437         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2438       return c.newInstance(conf, cp);
2439     } catch(Exception e) {
2440       Throwable error = e;
2441       if (e instanceof InvocationTargetException &&
2442           ((InvocationTargetException)e).getTargetException() != null) {
2443         error = ((InvocationTargetException)e).getTargetException();
2444       }
2445       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2446         , error);
2447     }
2448   }
2449 
2450   /**
2451    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2452    */
2453   public static void main(String [] args) {
2454     VersionInfo.logVersion();
2455     new HMasterCommandLine(HMaster.class).doMain(args);
2456   }
2457 
2458   public HFileCleaner getHFileCleaner() {
2459     return this.hfileCleaner;
2460   }
2461 
2462   /**
2463    * @return the underlying snapshot manager
2464    */
2465   public SnapshotManager getSnapshotManager() {
2466     return this.snapshotManager;
2467   }
2468 
2469   /**
2470    * @return the underlying MasterProcedureManagerHost
2471    */
2472   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2473     return mpmHost;
2474   }
2475 
2476   @Override
2477   public ClusterSchema getClusterSchema() {
2478     return this.clusterSchemaService;
2479   }
2480 
2481   /**
2482    * Create a new Namespace.
2483    * @param namespaceDescriptor descriptor for new Namespace
2484    * @param nonceGroup Identifier for the source of the request, a client or process.
2485    * @param nonce A unique identifier for this operation from the client or process identified by
2486    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2487    * @return procedure id
2488    */
2489   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2490       final long nonce)
2491   throws IOException {
2492     checkInitialized();
2493     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2494     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2495       throw new BypassCoprocessorException();
2496     }
2497     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2498     // Execute the operation synchronously - wait for the operation to complete before continuing.
2499     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2500     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2501     return procId;
2502   }
2503 
2504   /**
2505    * Modify an existing Namespace.
2506    * @param nonceGroup Identifier for the source of the request, a client or process.
2507    * @param nonce A unique identifier for this operation from the client or process identified by
2508    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2509    * @return procedure id
2510    */
2511   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2512       final long nonce)
2513   throws IOException {
2514     checkInitialized();
2515     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2516     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2517       throw new BypassCoprocessorException();
2518     }
2519     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2520     // Execute the operation synchronously - wait for the operation to complete before continuing.
2521     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2522     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2523     return procId;
2524   }
2525 
2526   /**
2527    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2528    * @param nonceGroup Identifier for the source of the request, a client or process.
2529    * @param nonce A unique identifier for this operation from the client or process identified by
2530    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2531    * @return procedure id
2532    */
2533   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2534   throws IOException {
2535     checkInitialized();
2536     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2537       throw new BypassCoprocessorException();
2538     }
2539     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2540     // Execute the operation synchronously - wait for the operation to complete before continuing.
2541     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2542     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2543     return procId;
2544   }
2545 
2546   /**
2547    * Get a Namespace
2548    * @param name Name of the Namespace
2549    * @return Namespace descriptor for <code>name</code>
2550    */
2551   NamespaceDescriptor getNamespace(String name) throws IOException {
2552     checkInitialized();
2553     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2554     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2555     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2556     return nsd;
2557   }
2558 
2559   /**
2560    * Get all Namespaces
2561    * @return All Namespace descriptors
2562    */
2563   List<NamespaceDescriptor> getNamespaces() throws IOException {
2564     checkInitialized();
2565     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2566     boolean bypass = false;
2567     if (cpHost != null) {
2568       bypass = cpHost.preListNamespaceDescriptors(nsds);
2569     }
2570     if (!bypass) {
2571       nsds.addAll(this.clusterSchemaService.getNamespaces());
2572       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2573     }
2574     return nsds;
2575   }
2576 
2577   @Override
2578   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2579     checkInitialized();
2580     return listTableNames(name, null, true);
2581   }
2582 
2583   @Override
2584   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2585     checkInitialized();
2586     return listTableDescriptors(name, null, null, true);
2587   }
2588 
2589   @Override
2590   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2591       throws IOException {
2592     if (cpHost != null) {
2593       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2594     }
2595 
2596     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2597 
2598     if (cpHost != null) {
2599       cpHost.postAbortProcedure();
2600     }
2601 
2602     return result;
2603   }
2604 
2605   @Override
2606   public List<ProcedureInfo> listProcedures() throws IOException {
2607     if (cpHost != null) {
2608       cpHost.preListProcedures();
2609     }
2610 
2611     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2612 
2613     if (cpHost != null) {
2614       cpHost.postListProcedures(procInfoList);
2615     }
2616 
2617     return procInfoList;
2618   }
2619 
2620   /**
2621    * Returns the list of table descriptors that match the specified request
2622    * @param namespace the namespace to query, or null if querying for all
2623    * @param regex The regular expression to match against, or null if querying for all
2624    * @param tableNameList the list of table names, or null if querying for all
2625    * @param includeSysTables False to match only against userspace tables
2626    * @return the list of table descriptors
2627    */
2628   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2629       final List<TableName> tableNameList, final boolean includeSysTables)
2630   throws IOException {
2631     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2632     boolean bypass = cpHost != null?
2633         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2634     if (!bypass) {
2635       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2636       if (cpHost != null) {
2637         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2638       }
2639     }
2640     return htds;
2641   }
2642 
2643   /**
2644    * Returns the list of table names that match the specified request
2645    * @param regex The regular expression to match against, or null if querying for all
2646    * @param namespace the namespace to query, or null if querying for all
2647    * @param includeSysTables False to match only against userspace tables
2648    * @return the list of table names
2649    */
2650   public List<TableName> listTableNames(final String namespace, final String regex,
2651       final boolean includeSysTables) throws IOException {
2652     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2653     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2654     if (!bypass) {
2655       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2656       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2657     }
2658     List<TableName> result = new ArrayList<TableName>(htds.size());
2659     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2660     return result;
2661   }
2662 
2663   /**
2664    * @return list of table table descriptors after filtering by regex and whether to include system
2665    *    tables, etc.
2666    * @throws IOException
2667    */
2668   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2669       final String namespace, final String regex, final List<TableName> tableNameList,
2670       final boolean includeSysTables)
2671   throws IOException {
2672     if (tableNameList == null || tableNameList.size() == 0) {
2673       // request for all TableDescriptors
2674       Collection<HTableDescriptor> allHtds;
2675       if (namespace != null && namespace.length() > 0) {
2676         // Do a check on the namespace existence. Will fail if does not exist.
2677         this.clusterSchemaService.getNamespace(namespace);
2678         allHtds = tableDescriptors.getByNamespace(namespace).values();
2679       } else {
2680         allHtds = tableDescriptors.getAll().values();
2681       }
2682       for (HTableDescriptor desc: allHtds) {
2683         if (tableStateManager.isTablePresent(desc.getTableName())
2684             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2685           htds.add(desc);
2686         }
2687       }
2688     } else {
2689       for (TableName s: tableNameList) {
2690         if (tableStateManager.isTablePresent(s)) {
2691           HTableDescriptor desc = tableDescriptors.get(s);
2692           if (desc != null) {
2693             htds.add(desc);
2694           }
2695         }
2696       }
2697     }
2698 
2699     // Retains only those matched by regular expression.
2700     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2701     return htds;
2702   }
2703 
2704   /**
2705    * Removes the table descriptors that don't match the pattern.
2706    * @param descriptors list of table descriptors to filter
2707    * @param pattern the regex to use
2708    */
2709   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2710       final Pattern pattern) {
2711     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2712     Iterator<HTableDescriptor> itr = descriptors.iterator();
2713     while (itr.hasNext()) {
2714       HTableDescriptor htd = itr.next();
2715       String tableName = htd.getTableName().getNameAsString();
2716       boolean matched = pattern.matcher(tableName).matches();
2717       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2718         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2719       }
2720       if (!matched) {
2721         itr.remove();
2722       }
2723     }
2724   }
2725 
2726   @Override
2727   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2728     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2729   }
2730 
2731   @Override
2732   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2733     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2734   }
2735 
2736   /**
2737    * Gets the mob file compaction state for a specific table.
2738    * Whether all the mob files are selected is known during the compaction execution, but
2739    * the statistic is done just before compaction starts, it is hard to know the compaction
2740    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2741    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2742    * @param tableName The current table name.
2743    * @return If a given table is in mob file compaction now.
2744    */
2745   public CompactionState getMobCompactionState(TableName tableName) {
2746     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2747     if (compactionsCount != null && compactionsCount.get() != 0) {
2748       return CompactionState.MAJOR_AND_MINOR;
2749     }
2750     return CompactionState.NONE;
2751   }
2752 
2753   public void reportMobCompactionStart(TableName tableName) throws IOException {
2754     IdLock.Entry lockEntry = null;
2755     try {
2756       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2757       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2758       if (compactionsCount == null) {
2759         compactionsCount = new AtomicInteger(0);
2760         mobCompactionStates.put(tableName, compactionsCount);
2761       }
2762       compactionsCount.incrementAndGet();
2763     } finally {
2764       if (lockEntry != null) {
2765         mobCompactionLock.releaseLockEntry(lockEntry);
2766       }
2767     }
2768   }
2769 
2770   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2771     IdLock.Entry lockEntry = null;
2772     try {
2773       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2774       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2775       if (compactionsCount != null) {
2776         int count = compactionsCount.decrementAndGet();
2777         // remove the entry if the count is 0.
2778         if (count == 0) {
2779           mobCompactionStates.remove(tableName);
2780         }
2781       }
2782     } finally {
2783       if (lockEntry != null) {
2784         mobCompactionLock.releaseLockEntry(lockEntry);
2785       }
2786     }
2787   }
2788 
2789   /**
2790    * Requests mob compaction.
2791    * @param tableName The table the compact.
2792    * @param columns The compacted columns.
2793    * @param allFiles Whether add all mob files into the compaction.
2794    */
2795   public void requestMobCompaction(TableName tableName,
2796     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2797     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2798       tableLockManager, allFiles);
2799   }
2800 
2801   /**
2802    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2803    * false is returned.
2804    *
2805    * @return The state of the load balancer, or false if the load balancer isn't defined.
2806    */
2807   public boolean isBalancerOn() {
2808     if (null == loadBalancerTracker) return false;
2809     return loadBalancerTracker.isBalancerOn();
2810   }
2811 
2812   /**
2813    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2814    * false is returned.
2815    */
2816   public boolean isNormalizerOn() {
2817     return null == regionNormalizerTracker? false: regionNormalizerTracker.isNormalizerOn();
2818   }
2819 
2820 
2821   /**
2822    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
2823    * false is returned. If switchType is illegal, false will return.
2824    * @param switchType see {@link org.apache.hadoop.hbase.client.Admin.MasterSwitchType}
2825    * @return The state of the switch
2826    */
2827   public boolean isSplitOrMergeEnabled(Admin.MasterSwitchType switchType) {
2828     if (null == splitOrMergeTracker) {
2829       return false;
2830     }
2831     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
2832   }
2833 
2834   /**
2835    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2836    *
2837    * @return The name of the {@link LoadBalancer} in use.
2838    */
2839   public String getLoadBalancerClassName() {
2840     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2841         .getDefaultLoadBalancerClass().getName());
2842   }
2843 
2844   /**
2845    * @return RegionNormalizerTracker instance
2846    */
2847   public RegionNormalizerTracker getRegionNormalizerTracker() {
2848     return regionNormalizerTracker;
2849   }
2850 
2851   public SplitOrMergeTracker getSplitOrMergeTracker() {
2852     return splitOrMergeTracker;
2853   }
2854 
2855   @Override
2856   public LoadBalancer getLoadBalancer() {
2857     return balancer;
2858   }
2859 }