View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Descriptors;
25  import com.google.protobuf.Service;
26  
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.lang.reflect.Constructor;
31  import java.lang.reflect.InvocationTargetException;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.UnknownHostException;
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.Collection;
38  import java.util.Collections;
39  import java.util.Comparator;
40  import java.util.HashSet;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Map.Entry;
45  import java.util.Set;
46  import java.util.concurrent.TimeUnit;
47  import java.util.concurrent.atomic.AtomicInteger;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Pattern;
50  
51  import javax.servlet.ServletException;
52  import javax.servlet.http.HttpServlet;
53  import javax.servlet.http.HttpServletRequest;
54  import javax.servlet.http.HttpServletResponse;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.Path;
60  import org.apache.hadoop.hbase.ClusterStatus;
61  import org.apache.hadoop.hbase.CoordinatedStateException;
62  import org.apache.hadoop.hbase.CoordinatedStateManager;
63  import org.apache.hadoop.hbase.DoNotRetryIOException;
64  import org.apache.hadoop.hbase.HBaseIOException;
65  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
66  import org.apache.hadoop.hbase.HColumnDescriptor;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HTableDescriptor;
70  import org.apache.hadoop.hbase.MasterNotRunningException;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NamespaceDescriptor;
73  import org.apache.hadoop.hbase.PleaseHoldException;
74  import org.apache.hadoop.hbase.ProcedureInfo;
75  import org.apache.hadoop.hbase.RegionStateListener;
76  import org.apache.hadoop.hbase.ScheduledChore;
77  import org.apache.hadoop.hbase.Server;
78  import org.apache.hadoop.hbase.ServerLoad;
79  import org.apache.hadoop.hbase.ServerName;
80  import org.apache.hadoop.hbase.TableDescriptors;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.TableNotDisabledException;
83  import org.apache.hadoop.hbase.TableNotFoundException;
84  import org.apache.hadoop.hbase.UnknownRegionException;
85  import org.apache.hadoop.hbase.classification.InterfaceAudience;
86  import org.apache.hadoop.hbase.client.MasterSwitchType;
87  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
88  import org.apache.hadoop.hbase.client.Result;
89  import org.apache.hadoop.hbase.client.TableState;
90  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
91  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
92  import org.apache.hadoop.hbase.exceptions.DeserializationException;
93  import org.apache.hadoop.hbase.executor.ExecutorType;
94  import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
95  import org.apache.hadoop.hbase.ipc.RpcServer;
96  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
97  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
98  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
99  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
100 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
101 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
102 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
103 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
104 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
105 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
106 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
107 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
108 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
109 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
110 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
111 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
112 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
113 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
115 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
116 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
117 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
118 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
119 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
120 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
121 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
122 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
123 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
124 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
125 import org.apache.hadoop.hbase.mob.MobConstants;
126 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
127 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
128 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
129 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
130 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
131 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
132 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
134 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
135 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
136 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
137 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
138 import org.apache.hadoop.hbase.regionserver.HRegionServer;
139 import org.apache.hadoop.hbase.regionserver.HStore;
140 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
141 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
142 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
143 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
144 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
145 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
146 import org.apache.hadoop.hbase.replication.regionserver.Replication;
147 import org.apache.hadoop.hbase.security.User;
148 import org.apache.hadoop.hbase.security.UserProvider;
149 import org.apache.hadoop.hbase.util.Addressing;
150 import org.apache.hadoop.hbase.util.Bytes;
151 import org.apache.hadoop.hbase.util.CompressionTest;
152 import org.apache.hadoop.hbase.util.EncryptionTest;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155 import org.apache.hadoop.hbase.util.HasThread;
156 import org.apache.hadoop.hbase.util.IdLock;
157 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
158 import org.apache.hadoop.hbase.util.Pair;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.util.ZKDataMigrator;
162 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
163 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
164 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
165 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
166 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
167 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
168 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
169 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
170 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
171 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
172 import org.apache.zookeeper.KeeperException;
173 import org.mortbay.jetty.Connector;
174 import org.mortbay.jetty.nio.SelectChannelConnector;
175 import org.mortbay.jetty.servlet.Context;
176
177 /**
178  * HMaster is the "master server" for HBase. An HBase cluster has one active
179  * master.  If many masters are started, all compete.  Whichever wins goes on to
180  * run the cluster.  All others park themselves in their constructor until
181  * master or cluster shutdown or until the active master loses its lease in
182  * zookeeper.  Thereafter, all running master jostle to take over master role.
183  *
184  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
185  * this case it will tell all regionservers to go down and then wait on them
186  * all reporting in that they are down.  This master will then shut itself down.
187  *
188  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
189  *
190  * @see org.apache.zookeeper.Watcher
191  */
192 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
193 @SuppressWarnings("deprecation")
194 public class HMaster extends HRegionServer implements MasterServices {
195   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
196
197   /**
198    * Protection against zombie master. Started once Master accepts active responsibility and
199    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
200    */
201   private static class InitializationMonitor extends HasThread {
202     /** The amount of time in milliseconds to sleep before checking initialization status. */
203     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
204     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
205
206     /**
207      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
208      * true, do nothing otherwise.
209      */
210     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
211     public static final boolean HALT_DEFAULT = false;
212
213     private final HMaster master;
214     private final long timeout;
215     private final boolean haltOnTimeout;
216
217     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
218     InitializationMonitor(HMaster master) {
219       super("MasterInitializationMonitor");
220       this.master = master;
221       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
222       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
223       this.setDaemon(true);
224     }
225
226     @Override
227     public void run() {
228       try {
229         while (!master.isStopped() && master.isActiveMaster()) {
230           Thread.sleep(timeout);
231           if (master.isInitialized()) {
232             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
233           } else {
234             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
235                 + " consider submitting a bug report including a thread dump of this process.");
236             if (haltOnTimeout) {
237               LOG.error("Zombie Master exiting. Thread dump to stdout");
238               Threads.printThreadInfo(System.out, "Zombie HMaster");
239               System.exit(-1);
240             }
241           }
242         }
243       } catch (InterruptedException ie) {
244         LOG.trace("InitMonitor thread interrupted. Existing.");
245       }
246     }
247   }
248
249   // MASTER is name of the webapp and the attribute name used stuffing this
250   //instance into web context.
251   public static final String MASTER = "master";
252
253   // Manager and zk listener for master election
254   private final ActiveMasterManager activeMasterManager;
255   // Region server tracker
256   RegionServerTracker regionServerTracker;
257   // Draining region server tracker
258   private DrainingServerTracker drainingServerTracker;
259   // Tracker for load balancer state
260   LoadBalancerTracker loadBalancerTracker;
261
262   // Tracker for split and merge state
263   private SplitOrMergeTracker splitOrMergeTracker;
264
265   // Tracker for region normalizer state
266   private RegionNormalizerTracker regionNormalizerTracker;
267
268   private ClusterSchemaService clusterSchemaService;
269
270   // Metrics for the HMaster
271   final MetricsMaster metricsMaster;
272   // file system manager for the master FS operations
273   private MasterFileSystem fileSystemManager;
274   private MasterWalManager walManager;
275
276   // server manager to deal with region server info
277   private volatile ServerManager serverManager;
278
279   // manager of assignment nodes in zookeeper
280   private AssignmentManager assignmentManager;
281
282   // buffer for "fatal error" notices from region servers
283   // in the cluster. This is only used for assisting
284   // operations/debugging.
285   MemoryBoundedLogMessageBuffer rsFatals;
286
287   // flag set after we become the active master (used for testing)
288   private volatile boolean isActiveMaster = false;
289
290   // flag set after we complete initialization once active,
291   // it is not private since it's used in unit tests
292   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
293
294   // flag set after master services are started,
295   // initialization may have not completed yet.
296   volatile boolean serviceStarted = false;
297
298   // flag set after we complete assignMeta.
299   private final ProcedureEvent serverCrashProcessingEnabled =
300     new ProcedureEvent("server crash processing");
301
302   private LoadBalancer balancer;
303   private RegionNormalizer normalizer;
304   private BalancerChore balancerChore;
305   private RegionNormalizerChore normalizerChore;
306   private ClusterStatusChore clusterStatusChore;
307   private ClusterStatusPublisher clusterStatusPublisherChore = null;
308   private PeriodicDoMetrics periodicDoMetricsChore = null;
309
310   CatalogJanitor catalogJanitorChore;
311   private LogCleaner logCleaner;
312   private HFileCleaner hfileCleaner;
313   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
314   private MobCompactionChore mobCompactChore;
315   private MasterMobCompactionThread mobCompactThread;
316   // used to synchronize the mobCompactionStates
317   private final IdLock mobCompactionLock = new IdLock();
318   // save the information of mob compactions in tables.
319   // the key is table name, the value is the number of compactions in that table.
320   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
321
322   MasterCoprocessorHost cpHost;
323
324   private final boolean preLoadTableDescriptors;
325
326   // Time stamps for when a hmaster became active
327   private long masterActiveTime;
328
329   //should we check the compression codec type at master side, default true, HBASE-6370
330   private final boolean masterCheckCompression;
331
332   //should we check encryption settings at master side, default true
333   private final boolean masterCheckEncryption;
334
335   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
336
337   // monitor for snapshot of hbase tables
338   SnapshotManager snapshotManager;
339   // monitor for distributed procedures
340   private MasterProcedureManagerHost mpmHost;
341
342   // it is assigned after 'initialized' guard set to true, so should be volatile
343   private volatile MasterQuotaManager quotaManager;
344
345   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
346   private WALProcedureStore procedureStore;
347
348   // handle table states
349   private TableStateManager tableStateManager;
350
351   private long splitPlanCount;
352   private long mergePlanCount;
353
354   /** flag used in test cases in order to simulate RS failures during master initialization */
355   private volatile boolean initializationBeforeMetaAssignment = false;
356
357   /** jetty server for master to redirect requests to regionserver infoServer */
358   private org.mortbay.jetty.Server masterJettyServer;
359
360   public static class RedirectServlet extends HttpServlet {
361     private static final long serialVersionUID = 2894774810058302472L;
362     private static int regionServerInfoPort;
363
364     @Override
365     public void doGet(HttpServletRequest request,
366         HttpServletResponse response) throws ServletException, IOException {
367       String redirectUrl = request.getScheme() + "://"
368         + request.getServerName() + ":" + regionServerInfoPort
369         + request.getRequestURI();
370       response.sendRedirect(redirectUrl);
371     }
372   }
373
374   private static class PeriodicDoMetrics extends ScheduledChore {
375     private final HMaster server;
376     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
377       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
378       this.server = server;
379     }
380
381     @Override
382     protected void chore() {
383       server.doMetrics();
384     }
385   }
386
387   /**
388    * Initializes the HMaster. The steps are as follows:
389    * <p>
390    * <ol>
391    * <li>Initialize the local HRegionServer
392    * <li>Start the ActiveMasterManager.
393    * </ol>
394    * <p>
395    * Remaining steps of initialization occur in
396    * #finishActiveMasterInitialization(MonitoredTask) after
397    * the master becomes the active one.
398    */
399   public HMaster(final Configuration conf, CoordinatedStateManager csm)
400       throws IOException, KeeperException {
401     super(conf, csm);
402     this.rsFatals = new MemoryBoundedLogMessageBuffer(
403       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
404
405     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
406       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
407
408     // Disable usage of meta replicas in the master
409     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
410
411     Replication.decorateMasterConfiguration(this.conf);
412
413     // Hack! Maps DFSClient => Master for logs.  HDFS made this
414     // config param for task trackers, but we can piggyback off of it.
415     if (this.conf.get("mapreduce.task.attempt.id") == null) {
416       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
417     }
418
419     // should we check the compression codec type at master side, default true, HBASE-6370
420     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
421
422     // should we check encryption settings at master side, default true
423     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
424
425     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
426
427     // preload table descriptor at startup
428     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
429
430     // Do we publish the status?
431
432     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
433         HConstants.STATUS_PUBLISHED_DEFAULT);
434     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
435         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
436             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
437             ClusterStatusPublisher.Publisher.class);
438
439     if (shouldPublish) {
440       if (publisherClass == null) {
441         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
442             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
443             " is not set - not publishing status");
444       } else {
445         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
446         getChoreService().scheduleChore(clusterStatusPublisherChore);
447       }
448     }
449
450     // Some unit tests don't need a cluster, so no zookeeper at all
451     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
452       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
453       int infoPort = putUpJettyServer();
454       startActiveMasterManager(infoPort);
455     } else {
456       activeMasterManager = null;
457     }
458   }
459
460   // return the actual infoPort, -1 means disable info server.
461   private int putUpJettyServer() throws IOException {
462     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
463       return -1;
464     }
465     int infoPort = conf.getInt("hbase.master.info.port.orig",
466       HConstants.DEFAULT_MASTER_INFOPORT);
467     // -1 is for disabling info server, so no redirecting
468     if (infoPort < 0 || infoServer == null) {
469       return -1;
470     }
471     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
472     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
473       String msg =
474           "Failed to start redirecting jetty server. Address " + addr
475               + " does not belong to this host. Correct configuration parameter: "
476               + "hbase.master.info.bindAddress";
477       LOG.error(msg);
478       throw new IOException(msg);
479     }
480
481     RedirectServlet.regionServerInfoPort = infoServer.getPort();
482     if(RedirectServlet.regionServerInfoPort == infoPort) {
483       return infoPort;
484     }
485     masterJettyServer = new org.mortbay.jetty.Server();
486     Connector connector = new SelectChannelConnector();
487     connector.setHost(addr);
488     connector.setPort(infoPort);
489     masterJettyServer.addConnector(connector);
490     masterJettyServer.setStopAtShutdown(true);
491     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
492     context.addServlet(RedirectServlet.class, "/*");
493     try {
494       masterJettyServer.start();
495     } catch (Exception e) {
496       throw new IOException("Failed to start redirecting jetty server", e);
497     }
498     return connector.getLocalPort();
499   }
500
501   @Override
502   protected TableDescriptors getFsTableDescriptors() throws IOException {
503     return super.getFsTableDescriptors();
504   }
505
506   /**
507    * For compatibility, if failed with regionserver credentials, try the master one
508    */
509   @Override
510   protected void login(UserProvider user, String host) throws IOException {
511     try {
512       super.login(user, host);
513     } catch (IOException ie) {
514       user.login("hbase.master.keytab.file",
515         "hbase.master.kerberos.principal", host);
516     }
517   }
518
519   /**
520    * If configured to put regions on active master,
521    * wait till a backup master becomes active.
522    * Otherwise, loop till the server is stopped or aborted.
523    */
524   @Override
525   protected void waitForMasterActive(){
526     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
527     while (!(tablesOnMaster && isActiveMaster)
528         && !isStopped() && !isAborted()) {
529       sleeper.sleep();
530     }
531   }
532
533   @VisibleForTesting
534   public MasterRpcServices getMasterRpcServices() {
535     return (MasterRpcServices)rpcServices;
536   }
537
538   public boolean balanceSwitch(final boolean b) throws IOException {
539     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
540   }
541
542   @Override
543   protected String getProcessName() {
544     return MASTER;
545   }
546
547   @Override
548   protected boolean canCreateBaseZNode() {
549     return true;
550   }
551
552   @Override
553   protected boolean canUpdateTableDescriptor() {
554     return true;
555   }
556
557   @Override
558   protected RSRpcServices createRpcServices() throws IOException {
559     return new MasterRpcServices(this);
560   }
561
562   @Override
563   protected void configureInfoServer() {
564     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
565     infoServer.setAttribute(MASTER, this);
566     if (BaseLoadBalancer.tablesOnMaster(conf)) {
567       super.configureInfoServer();
568     }
569   }
570
571   @Override
572   protected Class<? extends HttpServlet> getDumpServlet() {
573     return MasterDumpServlet.class;
574   }
575
576   /**
577    * Emit the HMaster metrics, such as region in transition metrics.
578    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
579    */
580   private void doMetrics() {
581     try {
582       if (assignmentManager != null) {
583         assignmentManager.updateRegionsInTransitionMetrics();
584       }
585     } catch (Throwable e) {
586       LOG.error("Couldn't update metrics: " + e.getMessage());
587     }
588   }
589
590   MetricsMaster getMasterMetrics() {
591     return metricsMaster;
592   }
593
594   /**
595    * Initialize all ZK based system trackers.
596    */
597   void initializeZKBasedSystemTrackers() throws IOException,
598       InterruptedException, KeeperException, CoordinatedStateException {
599     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
600     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
601     this.normalizer.setMasterServices(this);
602     this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
603     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
604     this.loadBalancerTracker.start();
605
606     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
607     this.regionNormalizerTracker.start();
608
609     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
610     this.splitOrMergeTracker.start();
611
612     this.assignmentManager = new AssignmentManager(this, serverManager,
613       this.balancer, this.service, this.metricsMaster,
614       this.tableLockManager, tableStateManager);
615
616     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
617     this.regionServerTracker.start();
618
619     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
620     this.drainingServerTracker.start();
621
622     // Set the cluster as up.  If new RSs, they'll be waiting on this before
623     // going ahead with their startup.
624     boolean wasUp = this.clusterStatusTracker.isClusterUp();
625     if (!wasUp) this.clusterStatusTracker.setClusterUp();
626 
627     LOG.info("Server active/primary master=" + this.serverName +
628         ", sessionid=0x" +
629         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
630         ", setting cluster-up flag (Was=" + wasUp + ")");
631
632     // create/initialize the snapshot manager and other procedure managers
633     this.snapshotManager = new SnapshotManager();
634     this.mpmHost = new MasterProcedureManagerHost();
635     this.mpmHost.register(this.snapshotManager);
636     this.mpmHost.register(new MasterFlushTableProcedureManager());
637     this.mpmHost.loadProcedures(conf);
638     this.mpmHost.initialize(this, this.metricsMaster);
639
640   }
641
642   /**
643    * Finish initialization of HMaster after becoming the primary master.
644    *
645    * <ol>
646    * <li>Initialize master components - file system manager, server manager,
647    *     assignment manager, region server tracker, etc</li>
648    * <li>Start necessary service threads - balancer, catalog janior,
649    *     executor services, etc</li>
650    * <li>Set cluster as UP in ZooKeeper</li>
651    * <li>Wait for RegionServers to check-in</li>
652    * <li>Split logs and perform data recovery, if necessary</li>
653    * <li>Ensure assignment of meta/namespace regions<li>
654    * <li>Handle either fresh cluster start or master failover</li>
655    * </ol>
656    */
657   private void finishActiveMasterInitialization(MonitoredTask status)
658       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
659
660     isActiveMaster = true;
661     Thread zombieDetector = new Thread(new InitializationMonitor(this));
662     zombieDetector.start();
663
664     /*
665      * We are active master now... go initialize components we need to run.
666      * Note, there may be dross in zk from previous runs; it'll get addressed
667      * below after we determine if cluster startup or failover.
668      */
669
670     status.setStatus("Initializing Master file system");
671
672     this.masterActiveTime = System.currentTimeMillis();
673     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
674     this.fileSystemManager = new MasterFileSystem(this);
675     this.walManager = new MasterWalManager(this);
676
677     // enable table descriptors cache
678     this.tableDescriptors.setCacheOn();
679     // set the META's descriptor to the correct replication
680     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
681         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
682     // warm-up HTDs cache on master initialization
683     if (preLoadTableDescriptors) {
684       status.setStatus("Pre-loading table descriptors");
685       this.tableDescriptors.getAll();
686     }
687
688     // publish cluster ID
689     status.setStatus("Publishing Cluster ID in ZooKeeper");
690     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
691
692     this.serverManager = createServerManager(this, this);
693
694     // Invalidate all write locks held previously
695     this.tableLockManager.reapWriteLocks();
696     this.tableStateManager = new TableStateManager(this);
697
698     status.setStatus("Initializing ZK system trackers");
699     initializeZKBasedSystemTrackers();
700
701     // This is for backwards compatibility
702     // See HBASE-11393
703     status.setStatus("Update TableCFs node in ZNode");
704     TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
705             conf, this.clusterConnection);
706     tableCFsUpdater.update();
707
708     // initialize master side coprocessors before we start handling requests
709     status.setStatus("Initializing master coprocessors");
710     this.cpHost = new MasterCoprocessorHost(this, this.conf);
711
712     // start up all service threads.
713     status.setStatus("Initializing master service threads");
714     startServiceThreads();
715
716     // Wake up this server to check in
717     sleeper.skipSleepCycle();
718
719     // Wait for region servers to report in
720     this.serverManager.waitForRegionServers(status);
721     // Check zk for region servers that are up but didn't register
722     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
723       // The isServerOnline check is opportunistic, correctness is handled inside
724       if (!this.serverManager.isServerOnline(sn)
725           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
726         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
727       }
728     }
729
730     // get a list for previously failed RS which need log splitting work
731     // we recover hbase:meta region servers inside master initialization and
732     // handle other failed servers in SSH in order to start up master node ASAP
733     Set<ServerName> previouslyFailedServers =
734       this.walManager.getFailedServersFromLogFolders();
735
736     // log splitting for hbase:meta server
737     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
738     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
739       splitMetaLogBeforeAssignment(oldMetaServerLocation);
740       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
741       // may also host user regions
742     }
743     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
744     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
745     // instead of previouslyFailedMetaRSs alone to address the following two situations:
746     // 1) the chained failure situation(recovery failed multiple times in a row).
747     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
748     // same server still has non-meta wals to be replayed so that
749     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
750     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
751     // there is no op for the server.
752     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
753
754     this.initializationBeforeMetaAssignment = true;
755
756     // Wait for regionserver to finish initialization.
757     if (BaseLoadBalancer.tablesOnMaster(conf)) {
758       waitForServerOnline();
759     }
760
761     //initialize load balancer
762     this.balancer.setClusterStatus(getClusterStatus());
763     this.balancer.setMasterServices(this);
764     this.balancer.initialize();
765
766     // Check if master is shutting down because of some issue
767     // in initializing the regionserver or the balancer.
768     if (isStopped()) return;
769
770     // Make sure meta assigned before proceeding.
771     status.setStatus("Assigning Meta Region");
772     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
773     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
774     // assigned when master is shutting down
775     if (isStopped()) return;
776
777     // migrating existent table state from zk, so splitters
778     // and recovery process treat states properly.
779     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
780         .queryForTableStates(getZooKeeper()).entrySet()) {
781       LOG.info("Converting state from zk to new states:" + entry);
782       tableStateManager.setTableState(entry.getKey(), entry.getValue());
783     }
784     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
785
786     status.setStatus("Submitting log splitting work for previously failed region servers");
787     // Master has recovered hbase:meta region server and we put
788     // other failed region servers in a queue to be handled later by SSH
789     for (ServerName tmpServer : previouslyFailedServers) {
790       this.serverManager.processDeadServer(tmpServer, true);
791     }
792
793     // Fix up assignment manager status
794     status.setStatus("Starting assignment manager");
795     this.assignmentManager.joinCluster();
796
797     // set cluster status again after user regions are assigned
798     this.balancer.setClusterStatus(getClusterStatus());
799
800     // Start balancer and meta catalog janitor after meta and regions have been assigned.
801     status.setStatus("Starting balancer and catalog janitor");
802     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
803     getChoreService().scheduleChore(clusterStatusChore);
804     this.balancerChore = new BalancerChore(this);
805     getChoreService().scheduleChore(balancerChore);
806     this.normalizerChore = new RegionNormalizerChore(this);
807     getChoreService().scheduleChore(normalizerChore);
808     this.catalogJanitorChore = new CatalogJanitor(this, this);
809     getChoreService().scheduleChore(catalogJanitorChore);
810
811     // Do Metrics periodically
812     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
813     getChoreService().scheduleChore(periodicDoMetricsChore);
814
815     status.setStatus("Starting cluster schema service");
816     initClusterSchemaService();
817
818     if (this.cpHost != null) {
819       try {
820         this.cpHost.preMasterInitialization();
821       } catch (IOException e) {
822         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
823       }
824     }
825
826     status.markComplete("Initialization successful");
827     LOG.info("Master has completed initialization");
828     configurationManager.registerObserver(this.balancer);
829
830     // Set master as 'initialized'.
831     setInitialized(true);
832
833     // assign the meta replicas
834     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
835     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
836            HConstants.DEFAULT_META_REPLICA_NUM);
837     for (int i = 1; i < numReplicas; i++) {
838       assignMeta(status, EMPTY_SET, i);
839     }
840     unassignExcessMetaReplica(zooKeeper, numReplicas);
841
842     status.setStatus("Starting quota manager");
843     initQuotaManager();
844
845     // clear the dead servers with same host name and port of online server because we are not
846     // removing dead server with same hostname and port of rs which is trying to check in before
847     // master initialization. See HBASE-5916.
848     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
849
850     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
851     status.setStatus("Checking ZNode ACLs");
852     zooKeeper.checkAndSetZNodeAcls();
853
854     status.setStatus("Calling postStartMaster coprocessors");
855
856     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
857     getChoreService().scheduleChore(expiredMobFileCleanerChore);
858
859     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
860       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
861     if (mobCompactionPeriod > 0) {
862       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
863       getChoreService().scheduleChore(mobCompactChore);
864     } else {
865       LOG
866         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
867     }
868     this.mobCompactThread = new MasterMobCompactionThread(this);
869
870     if (this.cpHost != null) {
871       // don't let cp initialization errors kill the master
872       try {
873         this.cpHost.postStartMaster();
874       } catch (IOException ioe) {
875         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
876       }
877     }
878
879     zombieDetector.interrupt();
880   }
881   /**
882    * Create a {@link ServerManager} instance.
883    */
884   ServerManager createServerManager(final Server master,
885       final MasterServices services)
886   throws IOException {
887     // We put this out here in a method so can do a Mockito.spy and stub it out
888     // w/ a mocked up ServerManager.
889     setupClusterConnection();
890     return new ServerManager(master, services);
891   }
892
893   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
894     // unassign the unneeded replicas (for e.g., if the previous master was configured
895     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
896     try {
897       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
898       for (String metaReplicaZnode : metaReplicaZnodes) {
899         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
900         if (replicaId >= numMetaReplicasConfigured) {
901           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
902           LOG.info("Closing excess replica of meta region " + r.getRegion());
903           // send a close and wait for a max of 30 seconds
904           ServerManager.closeRegionSilentlyAndWait(getClusterConnection(), r.getServerName(),
905               r.getRegion(), 30000);
906           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
907         }
908       }
909     } catch (Exception ex) {
910       // ignore the exception since we don't want the master to be wedged due to potential
911       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
912       LOG.warn("Ignoring exception " + ex);
913     }
914   }
915
916   /**
917    * Check <code>hbase:meta</code> is assigned. If not, assign it.
918    */
919   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
920       throws InterruptedException, IOException, KeeperException {
921     // Work on meta region
922     int assigned = 0;
923     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
924     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
925       status.setStatus("Assigning hbase:meta region");
926     } else {
927       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
928     }
929
930     // Get current meta state from zk.
931     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
932     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
933         replicaId);
934     RegionStates regionStates = assignmentManager.getRegionStates();
935     regionStates.createRegionState(hri, metaState.getState(),
936         metaState.getServerName(), null);
937
938     if (!metaState.isOpened() || !metaTableLocator.verifyMetaRegionLocation(
939         this.getClusterConnection(), this.getZooKeeper(), timeout, replicaId)) {
940       ServerName currentMetaServer = metaState.getServerName();
941       if (serverManager.isServerOnline(currentMetaServer)) {
942         if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
943           LOG.info("Meta was in transition on " + currentMetaServer);
944         } else {
945           LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
946                     currentMetaServer);
947         }
948         assignmentManager.processRegionsInTransition(Arrays.asList(metaState));
949       } else {
950         if (currentMetaServer != null) {
951           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
952             splitMetaLogBeforeAssignment(currentMetaServer);
953             regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
954             previouslyFailedMetaRSs.add(currentMetaServer);
955           }
956         }
957         LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
958             " it was on " + currentMetaServer);
959         assignmentManager.assignMeta(hri);
960       }
961       assigned++;
962     }
963
964     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
965       // TODO: should we prevent from using state manager before meta was initialized?
966       // tableStateManager.start();
967       getTableStateManager().setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
968     }
969
970     if ((RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode())
971         && (!previouslyFailedMetaRSs.isEmpty())) {
972       // replay WAL edits mode need new hbase:meta RS is assigned firstly
973       status.setStatus("replaying log for Meta Region");
974       this.walManager.splitMetaLog(previouslyFailedMetaRSs);
975     }
976
977     this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
978     tableStateManager.start();
979
980     // Make sure a hbase:meta location is set. We need to enable SSH here since
981     // if the meta region server is died at this time, we need it to be re-assigned
982     // by SSH so that system tables can be assigned.
983     // No need to wait for meta is assigned = 0 when meta is just verified.
984     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
985     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
986       + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
987     status.setStatus("META assigned.");
988   }
989
990   void initClusterSchemaService() throws IOException, InterruptedException {
991     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
992     this.clusterSchemaService.startAndWait();
993     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
994   }
995
996   void initQuotaManager() throws IOException {
997     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
998     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
999     quotaManager.start();
1000     this.quotaManager = quotaManager;
1001   }
1002
1003   boolean isCatalogJanitorEnabled() {
1004     return catalogJanitorChore != null ?
1005       catalogJanitorChore.getEnabled() : false;
1006   }
1007
1008   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1009     if (RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode()) {
1010       // In log replay mode, we mark hbase:meta region as recovering in ZK
1011       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1012       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1013       this.walManager.prepareLogReplay(currentMetaServer, regions);
1014     } else {
1015       // In recovered.edits mode: create recovered edits file for hbase:meta server
1016       this.walManager.splitMetaLog(currentMetaServer);
1017     }
1018   }
1019
1020   private void enableCrashedServerProcessing(final boolean waitForMeta)
1021   throws IOException, InterruptedException {
1022     // If crashed server processing is disabled, we enable it and expire those dead but not expired
1023     // servers. This is required so that if meta is assigning to a server which dies after
1024     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
1025     // stuck here waiting forever if waitForMeta is specified.
1026     if (!isServerCrashProcessingEnabled()) {
1027       setServerCrashProcessingEnabled(true);
1028       this.serverManager.processQueuedDeadServers();
1029     }
1030
1031     if (waitForMeta) {
1032       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1033     }
1034   }
1035
1036   /**
1037    * This function returns a set of region server names under hbase:meta recovering region ZK node
1038    * @return Set of meta server names which were recorded in ZK
1039    */
1040   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1041     Set<ServerName> result = new HashSet<ServerName>();
1042     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1043       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1044     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1045     if (regionFailedServers == null) return result;
1046
1047     for(String failedServer : regionFailedServers) {
1048       ServerName server = ServerName.parseServerName(failedServer);
1049       result.add(server);
1050     }
1051     return result;
1052   }
1053
1054   @Override
1055   public TableDescriptors getTableDescriptors() {
1056     return this.tableDescriptors;
1057   }
1058
1059   @Override
1060   public ServerManager getServerManager() {
1061     return this.serverManager;
1062   }
1063
1064   @Override
1065   public MasterFileSystem getMasterFileSystem() {
1066     return this.fileSystemManager;
1067   }
1068
1069   @Override
1070   public MasterWalManager getMasterWalManager() {
1071     return this.walManager;
1072   }
1073
1074   @Override
1075   public TableStateManager getTableStateManager() {
1076     return tableStateManager;
1077   }
1078
1079   /*
1080    * Start up all services. If any of these threads gets an unhandled exception
1081    * then they just die with a logged message.  This should be fine because
1082    * in general, we do not expect the master to get such unhandled exceptions
1083    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1084    *  need to install an unexpected exception handler.
1085    */
1086   private void startServiceThreads() throws IOException{
1087    // Start the executor service pools
1088    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1089       conf.getInt("hbase.master.executor.openregion.threads", 5));
1090    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1091       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1092    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1093       conf.getInt("hbase.master.executor.serverops.threads", 5));
1094    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1095       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
1096    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1097       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1098
1099    // We depend on there being only one instance of this executor running
1100    // at a time.  To do concurrency, would need fencing of enable/disable of
1101    // tables.
1102    // Any time changing this maxThreads to > 1, pls see the comment at
1103    // AccessController#postCompletedCreateTableAction
1104    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1105    startProcedureExecutor();
1106
1107    // Start log cleaner thread
1108    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1109    this.logCleaner =
1110       new LogCleaner(cleanerInterval,
1111          this, conf, getMasterWalManager().getFileSystem(),
1112          getMasterWalManager().getOldLogDir());
1113     getChoreService().scheduleChore(logCleaner);
1114
1115    //start the hfile archive cleaner thread
1116     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1117     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1118         .getFileSystem(), archiveDir);
1119     getChoreService().scheduleChore(hfileCleaner);
1120     serviceStarted = true;
1121     if (LOG.isTraceEnabled()) {
1122       LOG.trace("Started service threads");
1123     }
1124   }
1125
1126   @Override
1127   protected void sendShutdownInterrupt() {
1128     super.sendShutdownInterrupt();
1129     stopProcedureExecutor();
1130   }
1131
1132   @Override
1133   protected void stopServiceThreads() {
1134     if (masterJettyServer != null) {
1135       LOG.info("Stopping master jetty server");
1136       try {
1137         masterJettyServer.stop();
1138       } catch (Exception e) {
1139         LOG.error("Failed to stop master jetty server", e);
1140       }
1141     }
1142     super.stopServiceThreads();
1143     stopChores();
1144
1145     // Wait for all the remaining region servers to report in IFF we were
1146     // running a cluster shutdown AND we were NOT aborting.
1147     if (!isAborted() && this.serverManager != null &&
1148         this.serverManager.isClusterShutdown()) {
1149       this.serverManager.letRegionServersShutdown();
1150     }
1151     if (LOG.isDebugEnabled()) {
1152       LOG.debug("Stopping service threads");
1153     }
1154     // Clean up and close up shop
1155     if (this.logCleaner != null) this.logCleaner.cancel(true);
1156     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1157     if (this.quotaManager != null) this.quotaManager.stop();
1158     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1159     if (this.serverManager != null) this.serverManager.stop();
1160     if (this.assignmentManager != null) this.assignmentManager.stop();
1161     if (this.walManager != null) this.walManager.stop();
1162     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1163     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1164   }
1165
1166   private void startProcedureExecutor() throws IOException {
1167     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1168     final Path logDir = new Path(fileSystemManager.getRootDir(),
1169         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1170
1171     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1172         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1173     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1174     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1175         procEnv.getProcedureQueue());
1176
1177     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1178         Math.max(Runtime.getRuntime().availableProcessors(),
1179           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1180     final boolean abortOnCorruption = conf.getBoolean(
1181         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1182         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1183     procedureStore.start(numThreads);
1184     procedureExecutor.start(numThreads, abortOnCorruption);
1185   }
1186
1187   private void stopProcedureExecutor() {
1188     if (procedureExecutor != null) {
1189       procedureExecutor.stop();
1190     }
1191
1192     if (procedureStore != null) {
1193       procedureStore.stop(isAborted());
1194     }
1195   }
1196
1197   private void stopChores() {
1198     if (this.expiredMobFileCleanerChore != null) {
1199       this.expiredMobFileCleanerChore.cancel(true);
1200     }
1201     if (this.mobCompactChore != null) {
1202       this.mobCompactChore.cancel(true);
1203     }
1204     if (this.balancerChore != null) {
1205       this.balancerChore.cancel(true);
1206     }
1207     if (this.normalizerChore != null) {
1208       this.normalizerChore.cancel(true);
1209     }
1210     if (this.clusterStatusChore != null) {
1211       this.clusterStatusChore.cancel(true);
1212     }
1213     if (this.catalogJanitorChore != null) {
1214       this.catalogJanitorChore.cancel(true);
1215     }
1216     if (this.clusterStatusPublisherChore != null){
1217       clusterStatusPublisherChore.cancel(true);
1218     }
1219     if (this.mobCompactThread != null) {
1220       this.mobCompactThread.close();
1221     }
1222
1223     if (this.periodicDoMetricsChore != null) {
1224       periodicDoMetricsChore.cancel();
1225     }
1226   }
1227
1228   /**
1229    * @return Get remote side's InetAddress
1230    */
1231   InetAddress getRemoteInetAddress(final int port,
1232       final long serverStartCode) throws UnknownHostException {
1233     // Do it out here in its own little method so can fake an address when
1234     // mocking up in tests.
1235     InetAddress ia = RpcServer.getRemoteIp();
1236
1237     // The call could be from the local regionserver,
1238     // in which case, there is no remote address.
1239     if (ia == null && serverStartCode == startcode) {
1240       InetSocketAddress isa = rpcServices.getSocketAddress();
1241       if (isa != null && isa.getPort() == port) {
1242         ia = isa.getAddress();
1243       }
1244     }
1245     return ia;
1246   }
1247
1248   /**
1249    * @return Maximum time we should run balancer for
1250    */
1251   private int getBalancerCutoffTime() {
1252     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1253     if (balancerCutoffTime == -1) {
1254       // if cutoff time isn't set, defaulting it to period time
1255       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1256       balancerCutoffTime = balancerPeriod;
1257     }
1258     return balancerCutoffTime;
1259   }
1260
1261   public boolean balance() throws IOException {
1262     return balance(false);
1263   }
1264
1265   public boolean balance(boolean force) throws IOException {
1266     // if master not initialized, don't run balancer.
1267     if (!isInitialized()) {
1268       LOG.debug("Master has not been initialized, don't run balancer.");
1269       return false;
1270     }
1271     // Do this call outside of synchronized block.
1272     int maximumBalanceTime = getBalancerCutoffTime();
1273     synchronized (this.balancer) {
1274       // If balance not true, don't run balancer.
1275       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1276       // Only allow one balance run at at time.
1277       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1278         Set<RegionState> regionsInTransition =
1279           this.assignmentManager.getRegionStates().getRegionsInTransition();
1280         // if hbase:meta region is in transition, result of assignment cannot be recorded
1281         // ignore the force flag in that case
1282         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1283         String prefix = force && !metaInTransition ? "R" : "Not r";
1284         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1285           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1286             abbreviate(regionsInTransition.toString(), 256));
1287         if (!force || metaInTransition) return false;
1288       }
1289       if (this.serverManager.areDeadServersInProgress()) {
1290         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1291           this.serverManager.getDeadServers());
1292         return false;
1293       }
1294
1295       if (this.cpHost != null) {
1296         try {
1297           if (this.cpHost.preBalance()) {
1298             LOG.debug("Coprocessor bypassing balancer request");
1299             return false;
1300           }
1301         } catch (IOException ioe) {
1302           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1303           return false;
1304         }
1305       }
1306
1307       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1308         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1309
1310       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1311
1312       //Give the balancer the current cluster state.
1313       this.balancer.setClusterStatus(getClusterStatus());
1314       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1315         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1316         if (partialPlans != null) plans.addAll(partialPlans);
1317       }
1318
1319       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1320       int rpCount = 0;  // number of RegionPlans balanced so far
1321       long totalRegPlanExecTime = 0;
1322       if (plans != null && !plans.isEmpty()) {
1323         for (RegionPlan plan: plans) {
1324           LOG.info("balance " + plan);
1325           long balStartTime = System.currentTimeMillis();
1326           //TODO: bulk assign
1327           this.assignmentManager.balance(plan);
1328           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1329           rpCount++;
1330           if (rpCount < plans.size() &&
1331               // if performing next balance exceeds cutoff time, exit the loop
1332               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1333             //TODO: After balance, there should not be a cutoff time (keeping it as
1334             // a security net for now)
1335             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1336               maximumBalanceTime);
1337             break;
1338           }
1339         }
1340       }
1341       if (this.cpHost != null) {
1342         try {
1343           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1344         } catch (IOException ioe) {
1345           // balancing already succeeded so don't change the result
1346           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1347         }
1348       }
1349     }
1350     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1351     // Return true indicating a success.
1352     return true;
1353   }
1354
1355   @Override
1356   @VisibleForTesting
1357   public RegionNormalizer getRegionNormalizer() {
1358     return this.normalizer;
1359   }
1360
1361   /**
1362    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1363    *
1364    * @return true if normalization step was performed successfully, false otherwise
1365    *    (specifically, if HMaster hasn't been initialized properly or normalization
1366    *    is globally disabled)
1367    */
1368   public boolean normalizeRegions() throws IOException {
1369     if (!isInitialized()) {
1370       LOG.debug("Master has not been initialized, don't run region normalizer.");
1371       return false;
1372     }
1373
1374     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1375       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1376       return false;
1377     }
1378
1379     synchronized (this.normalizer) {
1380       // Don't run the normalizer concurrently
1381       List<TableName> allEnabledTables = new ArrayList<>(
1382         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1383
1384       Collections.shuffle(allEnabledTables);
1385
1386       for (TableName table : allEnabledTables) {
1387         HTableDescriptor tblDesc = getTableDescriptors().get(table);
1388         if (table.isSystemTable() || (tblDesc != null &&
1389             !tblDesc.isNormalizationEnabled())) {
1390           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1391               + " table or doesn't have auto normalization turned on");
1392           continue;
1393         }
1394         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1395         if (plans != null) {
1396           for (NormalizationPlan plan : plans) {
1397             plan.execute(clusterConnection.getAdmin());
1398             if (plan.getType() == PlanType.SPLIT) {
1399               splitPlanCount++;
1400             } else if (plan.getType() == PlanType.MERGE) {
1401               mergePlanCount++;
1402             }
1403           }
1404         }
1405       }
1406     }
1407     // If Region did not generate any plans, it means the cluster is already balanced.
1408     // Return true indicating a success.
1409     return true;
1410   }
1411
1412   /**
1413    * @return Client info for use as prefix on an audit log string; who did an action
1414    */
1415   String getClientIdAuditPrefix() {
1416     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1417   }
1418
1419   /**
1420    * Switch for the background CatalogJanitor thread.
1421    * Used for testing.  The thread will continue to run.  It will just be a noop
1422    * if disabled.
1423    * @param b If false, the catalog janitor won't do anything.
1424    */
1425   public void setCatalogJanitorEnabled(final boolean b) {
1426     this.catalogJanitorChore.setEnabled(b);
1427   }
1428
1429   @Override
1430   public void dispatchMergingRegions(final HRegionInfo region_a,
1431       final HRegionInfo region_b, final boolean forcible, final User user) throws IOException {
1432     checkInitialized();
1433     this.service.submit(new DispatchMergingRegionHandler(this,
1434       this.catalogJanitorChore, region_a, region_b, forcible, user));
1435   }
1436
1437   void move(final byte[] encodedRegionName,
1438       final byte[] destServerName) throws HBaseIOException {
1439     RegionState regionState = assignmentManager.getRegionStates().
1440       getRegionState(Bytes.toString(encodedRegionName));
1441
1442     HRegionInfo hri;
1443     if (regionState != null) {
1444       hri = regionState.getRegion();
1445     } else {
1446       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1447     }
1448
1449     ServerName dest;
1450     if (destServerName == null || destServerName.length == 0) {
1451       LOG.info("Passed destination servername is null/empty so " +
1452         "choosing a server at random");
1453       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1454         regionState.getServerName());
1455       dest = balancer.randomAssignment(hri, destServers);
1456       if (dest == null) {
1457         LOG.debug("Unable to determine a plan to assign " + hri);
1458         return;
1459       }
1460     } else {
1461       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1462       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1463       if (dest == null) {
1464         LOG.debug("Unable to determine a plan to assign " + hri);
1465         return;
1466       }
1467       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1468           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1469         // To avoid unnecessary region moving later by balancer. Don't put user
1470         // regions on master. Regions on master could be put on other region
1471         // server intentionally by test however.
1472         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1473           + " to avoid unnecessary region moving later by load balancer,"
1474           + " because it should not be on master");
1475         return;
1476       }
1477     }
1478
1479     if (dest.equals(regionState.getServerName())) {
1480       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1481         + " because region already assigned to the same server " + dest + ".");
1482       return;
1483     }
1484
1485     // Now we can do the move
1486     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1487
1488     try {
1489       checkInitialized();
1490       if (this.cpHost != null) {
1491         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1492           return;
1493         }
1494       }
1495       // warmup the region on the destination before initiating the move. this call
1496       // is synchronous and takes some time. doing it before the source region gets
1497       // closed
1498       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1499
1500       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1501       this.assignmentManager.balance(rp);
1502       if (this.cpHost != null) {
1503         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1504       }
1505     } catch (IOException ioe) {
1506       if (ioe instanceof HBaseIOException) {
1507         throw (HBaseIOException)ioe;
1508       }
1509       throw new HBaseIOException(ioe);
1510     }
1511   }
1512
1513   @Override
1514   public long createTable(
1515       final HTableDescriptor hTableDescriptor,
1516       final byte [][] splitKeys,
1517       final long nonceGroup,
1518       final long nonce) throws IOException {
1519     if (isStopped()) {
1520       throw new MasterNotRunningException();
1521     }
1522     checkInitialized();
1523     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1524     this.clusterSchemaService.getNamespace(namespace);
1525
1526     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1527     checkInitialized();
1528     sanityCheckTableDescriptor(hTableDescriptor);
1529     if (cpHost != null) {
1530       cpHost.preCreateTable(hTableDescriptor, newRegions);
1531     }
1532     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1533
1534     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1535     //       TableExistsException by saying if the schema is the same or not.
1536     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1537     long procId = this.procedureExecutor.submitProcedure(
1538       new CreateTableProcedure(
1539         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1540       nonceGroup,
1541       nonce);
1542     latch.await();
1543
1544     if (cpHost != null) {
1545       cpHost.postCreateTable(hTableDescriptor, newRegions);
1546     }
1547
1548     return procId;
1549   }
1550
1551   /**
1552    * Checks whether the table conforms to some sane limits, and configured
1553    * values (compression, etc) work. Throws an exception if something is wrong.
1554    * @throws IOException
1555    */
1556   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1557     final String CONF_KEY = "hbase.table.sanity.checks";
1558     boolean logWarn = false;
1559     if (!conf.getBoolean(CONF_KEY, true)) {
1560       logWarn = true;
1561     }
1562     String tableVal = htd.getConfigurationValue(CONF_KEY);
1563     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1564       logWarn = true;
1565     }
1566
1567     // check max file size
1568     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1569     long maxFileSize = htd.getMaxFileSize();
1570     if (maxFileSize < 0) {
1571       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1572     }
1573     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1574       String message = "MAX_FILESIZE for table descriptor or "
1575           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1576           + ") is too small, which might cause over splitting into unmanageable "
1577           + "number of regions.";
1578       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1579     }
1580
1581     // check flush size
1582     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1583     long flushSize = htd.getMemStoreFlushSize();
1584     if (flushSize < 0) {
1585       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1586     }
1587     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1588       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1589           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1590           + " very frequent flushing.";
1591       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1592     }
1593
1594     // check that coprocessors and other specified plugin classes can be loaded
1595     try {
1596       checkClassLoading(conf, htd);
1597     } catch (Exception ex) {
1598       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1599     }
1600
1601     // check compression can be loaded
1602     try {
1603       checkCompression(htd);
1604     } catch (IOException e) {
1605       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1606     }
1607
1608     // check encryption can be loaded
1609     try {
1610       checkEncryption(conf, htd);
1611     } catch (IOException e) {
1612       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1613     }
1614     // Verify compaction policy
1615     try{
1616       checkCompactionPolicy(conf, htd);
1617     } catch(IOException e){
1618       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1619     }
1620     // check that we have at least 1 CF
1621     if (htd.getColumnFamilies().length == 0) {
1622       String message = "Table should have at least one column family.";
1623       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1624     }
1625
1626     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1627       if (hcd.getTimeToLive() <= 0) {
1628         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1629         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1630       }
1631
1632       // check blockSize
1633       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1634         String message = "Block size for column family " + hcd.getNameAsString()
1635             + "  must be between 1K and 16MB.";
1636         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1637       }
1638
1639       // check versions
1640       if (hcd.getMinVersions() < 0) {
1641         String message = "Min versions for column family " + hcd.getNameAsString()
1642           + "  must be positive.";
1643         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1644       }
1645       // max versions already being checked
1646
1647       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1648       //  does not throw IllegalArgumentException
1649       // check minVersions <= maxVerions
1650       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1651         String message = "Min versions for column family " + hcd.getNameAsString()
1652             + " must be less than the Max versions.";
1653         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1654       }
1655
1656       // check replication scope
1657       if (hcd.getScope() < 0) {
1658         String message = "Replication scope for column family "
1659           + hcd.getNameAsString() + "  must be positive.";
1660         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1661       }
1662
1663       // check data replication factor, it can be 0(default value) when user has not explicitly
1664       // set the value, in this case we use default replication factor set in the file system.
1665       if (hcd.getDFSReplication() < 0) {
1666         String message = "HFile Replication for column family " + hcd.getNameAsString()
1667             + "  must be greater than zero.";
1668         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1669       }
1670
1671       // TODO: should we check coprocessors and encryption ?
1672     }
1673   }
1674
1675   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1676       throws IOException {
1677     // FIFO compaction has some requirements
1678     // Actually FCP ignores periodic major compactions
1679     String className =
1680         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1681     if (className == null) {
1682       className =
1683           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1684             ExploringCompactionPolicy.class.getName());
1685     }
1686
1687     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1688     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1689     if (sv != null) {
1690       blockingFileCount = Integer.parseInt(sv);
1691     } else {
1692       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1693     }
1694
1695     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1696       String compactionPolicy =
1697           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1698       if (compactionPolicy == null) {
1699         compactionPolicy = className;
1700       }
1701       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1702         continue;
1703       }
1704       // FIFOCompaction
1705       String message = null;
1706
1707       // 1. Check TTL
1708       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1709         message = "Default TTL is not supported for FIFO compaction";
1710         throw new IOException(message);
1711       }
1712
1713       // 2. Check min versions
1714       if (hcd.getMinVersions() > 0) {
1715         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1716         throw new IOException(message);
1717       }
1718
1719       // 3. blocking file count
1720       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1721       if (sbfc != null) {
1722         blockingFileCount = Integer.parseInt(sbfc);
1723       }
1724       if (blockingFileCount < 1000) {
1725         message =
1726             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1727                 + " is below recommended minimum of 1000";
1728         throw new IOException(message);
1729       }
1730     }
1731   }
1732
1733   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1734   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1735       String message, Exception cause) throws IOException {
1736     if (!logWarn) {
1737       throw new DoNotRetryIOException(message + " Set " + confKey +
1738           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1739     }
1740     LOG.warn(message);
1741   }
1742
1743   private void startActiveMasterManager(int infoPort) throws KeeperException {
1744     String backupZNode = ZKUtil.joinZNode(
1745       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1746     /*
1747     * Add a ZNode for ourselves in the backup master directory since we
1748     * may not become the active master. If so, we want the actual active
1749     * master to know we are backup masters, so that it won't assign
1750     * regions to us if so configured.
1751     *
1752     * If we become the active master later, ActiveMasterManager will delete
1753     * this node explicitly.  If we crash before then, ZooKeeper will delete
1754     * this node for us since it is ephemeral.
1755     */
1756     LOG.info("Adding backup master ZNode " + backupZNode);
1757     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1758         serverName, infoPort)) {
1759       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1760     }
1761
1762     activeMasterManager.setInfoPort(infoPort);
1763     // Start a thread to try to become the active master, so we won't block here
1764     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1765       @Override
1766       public void run() {
1767         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1768           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1769         // If we're a backup master, stall until a primary to writes his address
1770         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1771           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1772           LOG.debug("HMaster started in backup mode. "
1773             + "Stalling until master znode is written.");
1774           // This will only be a minute or so while the cluster starts up,
1775           // so don't worry about setting watches on the parent znode
1776           while (!activeMasterManager.hasActiveMaster()) {
1777             LOG.debug("Waiting for master address ZNode to be written "
1778               + "(Also watching cluster state node)");
1779             Threads.sleep(timeout);
1780           }
1781         }
1782         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1783         status.setDescription("Master startup");
1784         try {
1785           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1786             finishActiveMasterInitialization(status);
1787           }
1788         } catch (Throwable t) {
1789           status.setStatus("Failed to become active: " + t.getMessage());
1790           LOG.fatal("Failed to become active master", t);
1791           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1792           if (t instanceof NoClassDefFoundError &&
1793             t.getMessage()
1794               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1795             // improved error message for this special case
1796             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1797               + "recompile HBase against Hadoop version "
1798               + org.apache.hadoop.util.VersionInfo.getVersion()
1799               + " or change your hadoop jars to start properly", t);
1800           } else {
1801             abort("Unhandled exception. Starting shutdown.", t);
1802           }
1803         } finally {
1804           status.cleanup();
1805         }
1806       }
1807     }, getServerName().toShortString() + ".activeMasterManager"));
1808   }
1809
1810   private void checkCompression(final HTableDescriptor htd)
1811   throws IOException {
1812     if (!this.masterCheckCompression) return;
1813     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1814       checkCompression(hcd);
1815     }
1816   }
1817
1818   private void checkCompression(final HColumnDescriptor hcd)
1819   throws IOException {
1820     if (!this.masterCheckCompression) return;
1821     CompressionTest.testCompression(hcd.getCompressionType());
1822     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1823   }
1824
1825   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1826   throws IOException {
1827     if (!this.masterCheckEncryption) return;
1828     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1829       checkEncryption(conf, hcd);
1830     }
1831   }
1832
1833   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1834   throws IOException {
1835     if (!this.masterCheckEncryption) return;
1836     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1837   }
1838
1839   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1840   throws IOException {
1841     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1842     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1843   }
1844
1845   private static boolean isCatalogTable(final TableName tableName) {
1846     return tableName.equals(TableName.META_TABLE_NAME);
1847   }
1848
1849   @Override
1850   public long deleteTable(
1851       final TableName tableName,
1852       final long nonceGroup,
1853       final long nonce) throws IOException {
1854     checkInitialized();
1855     if (cpHost != null) {
1856       cpHost.preDeleteTable(tableName);
1857     }
1858     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1859
1860     // TODO: We can handle/merge duplicate request
1861     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1862     long procId = this.procedureExecutor.submitProcedure(
1863       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1864       nonceGroup,
1865       nonce);
1866     latch.await();
1867
1868     if (cpHost != null) {
1869       cpHost.postDeleteTable(tableName);
1870     }
1871
1872     return procId;
1873   }
1874
1875   @Override
1876   public long truncateTable(
1877       final TableName tableName,
1878       final boolean preserveSplits,
1879       final long nonceGroup,
1880       final long nonce) throws IOException {
1881     checkInitialized();
1882     if (cpHost != null) {
1883       cpHost.preTruncateTable(tableName);
1884     }
1885     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1886
1887     long procId = this.procedureExecutor.submitProcedure(
1888       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1889       nonceGroup,
1890       nonce);
1891     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1892
1893     if (cpHost != null) {
1894       cpHost.postTruncateTable(tableName);
1895     }
1896     return procId;
1897   }
1898
1899   @Override
1900   public long addColumn(
1901       final TableName tableName,
1902       final HColumnDescriptor columnDescriptor,
1903       final long nonceGroup,
1904       final long nonce)
1905       throws IOException {
1906     checkInitialized();
1907     checkCompression(columnDescriptor);
1908     checkEncryption(conf, columnDescriptor);
1909     if (cpHost != null) {
1910       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1911         return -1;
1912       }
1913     }
1914     // Execute the operation synchronously - wait for the operation to complete before continuing.
1915     long procId = this.procedureExecutor.submitProcedure(
1916       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1917       nonceGroup,
1918       nonce);
1919     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1920     if (cpHost != null) {
1921       cpHost.postAddColumn(tableName, columnDescriptor);
1922     }
1923     return procId;
1924   }
1925
1926   @Override
1927   public long modifyColumn(
1928       final TableName tableName,
1929       final HColumnDescriptor descriptor,
1930       final long nonceGroup,
1931       final long nonce)
1932       throws IOException {
1933     checkInitialized();
1934     checkCompression(descriptor);
1935     checkEncryption(conf, descriptor);
1936     if (cpHost != null) {
1937       if (cpHost.preModifyColumn(tableName, descriptor)) {
1938         return -1;
1939       }
1940     }
1941     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1942
1943     // Execute the operation synchronously - wait for the operation to complete before continuing.
1944     long procId = this.procedureExecutor.submitProcedure(
1945       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1946       nonceGroup,
1947       nonce);
1948     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1949
1950     if (cpHost != null) {
1951       cpHost.postModifyColumn(tableName, descriptor);
1952     }
1953     return procId;
1954   }
1955
1956   @Override
1957   public long deleteColumn(
1958       final TableName tableName,
1959       final byte[] columnName,
1960       final long nonceGroup,
1961       final long nonce)
1962       throws IOException {
1963     checkInitialized();
1964     if (cpHost != null) {
1965       if (cpHost.preDeleteColumn(tableName, columnName)) {
1966         return -1;
1967       }
1968     }
1969     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1970
1971     // Execute the operation synchronously - wait for the operation to complete before continuing.
1972     long procId = this.procedureExecutor.submitProcedure(
1973       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1974       nonceGroup,
1975       nonce);
1976     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1977
1978     if (cpHost != null) {
1979       cpHost.postDeleteColumn(tableName, columnName);
1980     }
1981     return procId;
1982   }
1983
1984   @Override
1985   public long enableTable(
1986       final TableName tableName,
1987       final long nonceGroup,
1988       final long nonce) throws IOException {
1989     checkInitialized();
1990     if (cpHost != null) {
1991       cpHost.preEnableTable(tableName);
1992     }
1993     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1994
1995     // Execute the operation asynchronously - client will check the progress of the operation
1996     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1997     long procId = this.procedureExecutor.submitProcedure(
1998       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1999       nonceGroup,
2000       nonce);
2001     // Before returning to client, we want to make sure that the table is prepared to be
2002     // enabled (the table is locked and the table state is set).
2003     //
2004     // Note: if the procedure throws exception, we will catch it and rethrow.
2005     prepareLatch.await();
2006
2007     if (cpHost != null) {
2008       cpHost.postEnableTable(tableName);
2009     }
2010
2011     return procId;
2012   }
2013
2014   @Override
2015   public long disableTable(
2016       final TableName tableName,
2017       final long nonceGroup,
2018       final long nonce) throws IOException {
2019     checkInitialized();
2020     if (cpHost != null) {
2021       cpHost.preDisableTable(tableName);
2022     }
2023     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2024
2025     // Execute the operation asynchronously - client will check the progress of the operation
2026     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2027     // Execute the operation asynchronously - client will check the progress of the operation
2028     long procId = this.procedureExecutor.submitProcedure(
2029       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
2030       nonceGroup,
2031       nonce);
2032     // Before returning to client, we want to make sure that the table is prepared to be
2033     // enabled (the table is locked and the table state is set).
2034     //
2035     // Note: if the procedure throws exception, we will catch it and rethrow.
2036     prepareLatch.await();
2037
2038     if (cpHost != null) {
2039       cpHost.postDisableTable(tableName);
2040     }
2041
2042     return procId;
2043   }
2044
2045   /**
2046    * Return the region and current deployment for the region containing
2047    * the given row. If the region cannot be found, returns null. If it
2048    * is found, but not currently deployed, the second element of the pair
2049    * may be null.
2050    */
2051   @VisibleForTesting // Used by TestMaster.
2052   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2053       final TableName tableName, final byte [] rowKey)
2054   throws IOException {
2055     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2056       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2057
2058     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2059         @Override
2060         public boolean visit(Result data) throws IOException {
2061           if (data == null || data.size() <= 0) {
2062             return true;
2063           }
2064           Pair<HRegionInfo, ServerName> pair =
2065               new Pair(MetaTableAccessor.getHRegionInfo(data),
2066                   MetaTableAccessor.getServerName(data,0));
2067           if (pair == null) {
2068             return false;
2069           }
2070           if (!pair.getFirst().getTable().equals(tableName)) {
2071             return false;
2072           }
2073           result.set(pair);
2074           return true;
2075         }
2076     };
2077
2078     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2079     return result.get();
2080   }
2081
2082   @Override
2083   public long modifyTable(
2084       final TableName tableName,
2085       final HTableDescriptor descriptor,
2086       final long nonceGroup,
2087       final long nonce)
2088       throws IOException {
2089     checkInitialized();
2090     sanityCheckTableDescriptor(descriptor);
2091     if (cpHost != null) {
2092       cpHost.preModifyTable(tableName, descriptor);
2093     }
2094
2095     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2096
2097     // Execute the operation synchronously - wait for the operation completes before continuing.
2098     long procId = this.procedureExecutor.submitProcedure(
2099       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2100       nonceGroup,
2101       nonce);
2102
2103     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2104
2105     if (cpHost != null) {
2106       cpHost.postModifyTable(tableName, descriptor);
2107     }
2108
2109     return procId;
2110   }
2111
2112   @Override
2113   public void checkTableModifiable(final TableName tableName)
2114       throws IOException, TableNotFoundException, TableNotDisabledException {
2115     if (isCatalogTable(tableName)) {
2116       throw new IOException("Can't modify catalog tables");
2117     }
2118     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2119       throw new TableNotFoundException(tableName);
2120     }
2121     if (!getTableStateManager().isTableState(tableName, TableState.State.DISABLED)) {
2122       throw new TableNotDisabledException(tableName);
2123     }
2124   }
2125
2126   /**
2127    * @return cluster status
2128    */
2129   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2130     // Build Set of backup masters from ZK nodes
2131     List<String> backupMasterStrings;
2132     try {
2133       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2134         this.zooKeeper.backupMasterAddressesZNode);
2135     } catch (KeeperException e) {
2136       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2137       backupMasterStrings = null;
2138     }
2139
2140     List<ServerName> backupMasters = null;
2141     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2142       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2143       for (String s: backupMasterStrings) {
2144         try {
2145           byte [] bytes;
2146           try {
2147             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2148                 this.zooKeeper.backupMasterAddressesZNode, s));
2149           } catch (InterruptedException e) {
2150             throw new InterruptedIOException();
2151           }
2152           if (bytes != null) {
2153             ServerName sn;
2154             try {
2155               sn = ServerName.parseFrom(bytes);
2156             } catch (DeserializationException e) {
2157               LOG.warn("Failed parse, skipping registering backup server", e);
2158               continue;
2159             }
2160             backupMasters.add(sn);
2161           }
2162         } catch (KeeperException e) {
2163           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2164                    "backup servers"), e);
2165         }
2166       }
2167       Collections.sort(backupMasters, new Comparator<ServerName>() {
2168         @Override
2169         public int compare(ServerName s1, ServerName s2) {
2170           return s1.getServerName().compareTo(s2.getServerName());
2171         }});
2172     }
2173
2174     String clusterId = fileSystemManager != null ?
2175       fileSystemManager.getClusterId().toString() : null;
2176     Set<RegionState> regionsInTransition = assignmentManager != null ?
2177       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2178     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2179     boolean balancerOn = loadBalancerTracker != null ?
2180       loadBalancerTracker.isBalancerOn() : false;
2181     Map<ServerName, ServerLoad> onlineServers = null;
2182     Set<ServerName> deadServers = null;
2183     if (serverManager != null) {
2184       deadServers = serverManager.getDeadServers().copyServerNames();
2185       onlineServers = serverManager.getOnlineServers();
2186     }
2187     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2188       onlineServers, deadServers, serverName, backupMasters,
2189       regionsInTransition, coprocessors, balancerOn);
2190   }
2191
2192   /**
2193    * The set of loaded coprocessors is stored in a static set. Since it's
2194    * statically allocated, it does not require that HMaster's cpHost be
2195    * initialized prior to accessing it.
2196    * @return a String representation of the set of names of the loaded coprocessors.
2197    */
2198   public static String getLoadedCoprocessors() {
2199     return CoprocessorHost.getLoadedCoprocessors().toString();
2200   }
2201
2202   /**
2203    * @return timestamp in millis when HMaster was started.
2204    */
2205   public long getMasterStartTime() {
2206     return startcode;
2207   }
2208
2209   /**
2210    * @return timestamp in millis when HMaster became the active master.
2211    */
2212   public long getMasterActiveTime() {
2213     return masterActiveTime;
2214   }
2215
2216   public int getNumWALFiles() {
2217     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2218   }
2219
2220   public WALProcedureStore getWalProcedureStore() {
2221     return procedureStore;
2222   }
2223
2224   public int getRegionServerInfoPort(final ServerName sn) {
2225     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2226     if (info == null || info.getInfoPort() == 0) {
2227       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2228         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2229     }
2230     return info.getInfoPort();
2231   }
2232
2233   public String getRegionServerVersion(final ServerName sn) {
2234     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2235     if (info != null && info.hasVersionInfo()) {
2236       return info.getVersionInfo().getVersion();
2237     }
2238     return "Unknown";
2239   }
2240
2241   /**
2242    * @return array of coprocessor SimpleNames.
2243    */
2244   public String[] getMasterCoprocessors() {
2245     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2246     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2247   }
2248
2249   @Override
2250   public void abort(final String msg, final Throwable t) {
2251     if (isAborted() || isStopped()) {
2252       return;
2253     }
2254     if (cpHost != null) {
2255       // HBASE-4014: dump a list of loaded coprocessors.
2256       LOG.fatal("Master server abort: loaded coprocessors are: " +
2257           getLoadedCoprocessors());
2258     }
2259     if (t != null) LOG.fatal(msg, t);
2260     stopMaster();
2261   }
2262
2263   @Override
2264   public ZooKeeperWatcher getZooKeeper() {
2265     return zooKeeper;
2266   }
2267
2268   @Override
2269   public MasterCoprocessorHost getMasterCoprocessorHost() {
2270     return cpHost;
2271   }
2272
2273   @Override
2274   public MasterQuotaManager getMasterQuotaManager() {
2275     return quotaManager;
2276   }
2277
2278   @Override
2279   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2280     return procedureExecutor;
2281   }
2282
2283   @Override
2284   public ServerName getServerName() {
2285     return this.serverName;
2286   }
2287
2288   @Override
2289   public AssignmentManager getAssignmentManager() {
2290     return this.assignmentManager;
2291   }
2292
2293   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2294     return rsFatals;
2295   }
2296
2297   public void shutdown() {
2298     if (cpHost != null) {
2299       try {
2300         cpHost.preShutdown();
2301       } catch (IOException ioe) {
2302         LOG.error("Error call master coprocessor preShutdown()", ioe);
2303       }
2304     }
2305
2306     if (this.serverManager != null) {
2307       this.serverManager.shutdownCluster();
2308     }
2309     if (this.clusterStatusTracker != null){
2310       try {
2311         this.clusterStatusTracker.setClusterDown();
2312       } catch (KeeperException e) {
2313         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2314       }
2315     }
2316   }
2317
2318   public void stopMaster() {
2319     if (cpHost != null) {
2320       try {
2321         cpHost.preStopMaster();
2322       } catch (IOException ioe) {
2323         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2324       }
2325     }
2326     stop("Stopped by " + Thread.currentThread().getName());
2327   }
2328
2329   void checkServiceStarted() throws ServerNotRunningYetException {
2330     if (!serviceStarted) {
2331       throw new ServerNotRunningYetException("Server is not running yet");
2332     }
2333   }
2334
2335   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2336     checkServiceStarted();
2337     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2338   }
2339
2340   /**
2341    * Report whether this master is currently the active master or not.
2342    * If not active master, we are parked on ZK waiting to become active.
2343    *
2344    * This method is used for testing.
2345    *
2346    * @return true if active master, false if not.
2347    */
2348   @Override
2349   public boolean isActiveMaster() {
2350     return isActiveMaster;
2351   }
2352
2353   /**
2354    * Report whether this master has completed with its initialization and is
2355    * ready.  If ready, the master is also the active master.  A standby master
2356    * is never ready.
2357    *
2358    * This method is used for testing.
2359    *
2360    * @return true if master is ready to go, false if not.
2361    */
2362   @Override
2363   public boolean isInitialized() {
2364     return initialized.isReady();
2365   }
2366
2367   @VisibleForTesting
2368   public void setInitialized(boolean isInitialized) {
2369     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2370   }
2371
2372   public ProcedureEvent getInitializedEvent() {
2373     return initialized;
2374   }
2375
2376   /**
2377    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2378    * of crashed servers.
2379    * @return true if assignMeta has completed;
2380    */
2381   @Override
2382   public boolean isServerCrashProcessingEnabled() {
2383     return serverCrashProcessingEnabled.isReady();
2384   }
2385
2386   @VisibleForTesting
2387   public void setServerCrashProcessingEnabled(final boolean b) {
2388     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2389   }
2390
2391   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2392     return serverCrashProcessingEnabled;
2393   }
2394
2395   /**
2396    * Report whether this master has started initialization and is about to do meta region assignment
2397    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2398    */
2399   public boolean isInitializationStartsMetaRegionAssignment() {
2400     return this.initializationBeforeMetaAssignment;
2401   }
2402
2403   /**
2404    * Compute the average load across all region servers.
2405    * Currently, this uses a very naive computation - just uses the number of
2406    * regions being served, ignoring stats about number of requests.
2407    * @return the average load
2408    */
2409   public double getAverageLoad() {
2410     if (this.assignmentManager == null) {
2411       return 0;
2412     }
2413
2414     RegionStates regionStates = this.assignmentManager.getRegionStates();
2415     if (regionStates == null) {
2416       return 0;
2417     }
2418     return regionStates.getAverageLoad();
2419   }
2420
2421   /*
2422    * @return the count of region split plans executed
2423    */
2424   public long getSplitPlanCount() {
2425     return splitPlanCount;
2426   }
2427
2428   /*
2429    * @return the count of region merge plans executed
2430    */
2431   public long getMergePlanCount() {
2432     return mergePlanCount;
2433   }
2434
2435   @Override
2436   public boolean registerService(Service instance) {
2437     /*
2438      * No stacking of instances is allowed for a single service name
2439      */
2440     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2441     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2442     if (coprocessorServiceHandlers.containsKey(serviceName)) {
2443       LOG.error("Coprocessor service "+serviceName+
2444           " already registered, rejecting request from "+instance
2445       );
2446       return false;
2447     }
2448
2449     coprocessorServiceHandlers.put(serviceName, instance);
2450     if (LOG.isDebugEnabled()) {
2451       LOG.debug("Registered master coprocessor service: service="+serviceName);
2452     }
2453     return true;
2454   }
2455
2456   /**
2457    * Utility for constructing an instance of the passed HMaster class.
2458    * @param masterClass
2459    * @return HMaster instance.
2460    */
2461   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2462       final Configuration conf, final CoordinatedStateManager cp)  {
2463     try {
2464       Constructor<? extends HMaster> c =
2465         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2466       return c.newInstance(conf, cp);
2467     } catch(Exception e) {
2468       Throwable error = e;
2469       if (e instanceof InvocationTargetException &&
2470           ((InvocationTargetException)e).getTargetException() != null) {
2471         error = ((InvocationTargetException)e).getTargetException();
2472       }
2473       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2474         , error);
2475     }
2476   }
2477
2478   /**
2479    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2480    */
2481   public static void main(String [] args) {
2482     VersionInfo.logVersion();
2483     new HMasterCommandLine(HMaster.class).doMain(args);
2484   }
2485
2486   public HFileCleaner getHFileCleaner() {
2487     return this.hfileCleaner;
2488   }
2489
2490   /**
2491    * @return the underlying snapshot manager
2492    */
2493   @Override
2494   public SnapshotManager getSnapshotManager() {
2495     return this.snapshotManager;
2496   }
2497
2498   /**
2499    * @return the underlying MasterProcedureManagerHost
2500    */
2501   @Override
2502   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2503     return mpmHost;
2504   }
2505
2506   @Override
2507   public ClusterSchema getClusterSchema() {
2508     return this.clusterSchemaService;
2509   }
2510
2511   /**
2512    * Create a new Namespace.
2513    * @param namespaceDescriptor descriptor for new Namespace
2514    * @param nonceGroup Identifier for the source of the request, a client or process.
2515    * @param nonce A unique identifier for this operation from the client or process identified by
2516    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2517    * @return procedure id
2518    */
2519   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2520       final long nonce)
2521   throws IOException {
2522     checkInitialized();
2523     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2524     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2525       throw new BypassCoprocessorException();
2526     }
2527     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2528     // Execute the operation synchronously - wait for the operation to complete before continuing.
2529     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2530     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2531     return procId;
2532   }
2533
2534   /**
2535    * Modify an existing Namespace.
2536    * @param nonceGroup Identifier for the source of the request, a client or process.
2537    * @param nonce A unique identifier for this operation from the client or process identified by
2538    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2539    * @return procedure id
2540    */
2541   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2542       final long nonce)
2543   throws IOException {
2544     checkInitialized();
2545     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2546     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2547       throw new BypassCoprocessorException();
2548     }
2549     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2550     // Execute the operation synchronously - wait for the operation to complete before continuing.
2551     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2552     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2553     return procId;
2554   }
2555
2556   /**
2557    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2558    * @param nonceGroup Identifier for the source of the request, a client or process.
2559    * @param nonce A unique identifier for this operation from the client or process identified by
2560    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2561    * @return procedure id
2562    */
2563   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2564   throws IOException {
2565     checkInitialized();
2566     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2567       throw new BypassCoprocessorException();
2568     }
2569     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2570     // Execute the operation synchronously - wait for the operation to complete before continuing.
2571     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2572     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2573     return procId;
2574   }
2575
2576   /**
2577    * Get a Namespace
2578    * @param name Name of the Namespace
2579    * @return Namespace descriptor for <code>name</code>
2580    */
2581   NamespaceDescriptor getNamespace(String name) throws IOException {
2582     checkInitialized();
2583     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2584     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2585     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2586     return nsd;
2587   }
2588
2589   /**
2590    * Get all Namespaces
2591    * @return All Namespace descriptors
2592    */
2593   List<NamespaceDescriptor> getNamespaces() throws IOException {
2594     checkInitialized();
2595     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2596     boolean bypass = false;
2597     if (cpHost != null) {
2598       bypass = cpHost.preListNamespaceDescriptors(nsds);
2599     }
2600     if (!bypass) {
2601       nsds.addAll(this.clusterSchemaService.getNamespaces());
2602       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2603     }
2604     return nsds;
2605   }
2606
2607   @Override
2608   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2609     checkInitialized();
2610     return listTableNames(name, null, true);
2611   }
2612
2613   @Override
2614   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2615     checkInitialized();
2616     return listTableDescriptors(name, null, null, true);
2617   }
2618
2619   @Override
2620   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2621       throws IOException {
2622     if (cpHost != null) {
2623       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2624     }
2625
2626     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2627
2628     if (cpHost != null) {
2629       cpHost.postAbortProcedure();
2630     }
2631
2632     return result;
2633   }
2634
2635   @Override
2636   public List<ProcedureInfo> listProcedures() throws IOException {
2637     if (cpHost != null) {
2638       cpHost.preListProcedures();
2639     }
2640
2641     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2642
2643     if (cpHost != null) {
2644       cpHost.postListProcedures(procInfoList);
2645     }
2646
2647     return procInfoList;
2648   }
2649
2650   /**
2651    * Returns the list of table descriptors that match the specified request
2652    * @param namespace the namespace to query, or null if querying for all
2653    * @param regex The regular expression to match against, or null if querying for all
2654    * @param tableNameList the list of table names, or null if querying for all
2655    * @param includeSysTables False to match only against userspace tables
2656    * @return the list of table descriptors
2657    */
2658   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2659       final List<TableName> tableNameList, final boolean includeSysTables)
2660   throws IOException {
2661     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2662     boolean bypass = cpHost != null?
2663         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2664     if (!bypass) {
2665       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2666       if (cpHost != null) {
2667         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2668       }
2669     }
2670     return htds;
2671   }
2672
2673   /**
2674    * Returns the list of table names that match the specified request
2675    * @param regex The regular expression to match against, or null if querying for all
2676    * @param namespace the namespace to query, or null if querying for all
2677    * @param includeSysTables False to match only against userspace tables
2678    * @return the list of table names
2679    */
2680   public List<TableName> listTableNames(final String namespace, final String regex,
2681       final boolean includeSysTables) throws IOException {
2682     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2683     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2684     if (!bypass) {
2685       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2686       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2687     }
2688     List<TableName> result = new ArrayList<TableName>(htds.size());
2689     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2690     return result;
2691   }
2692
2693   /**
2694    * @return list of table table descriptors after filtering by regex and whether to include system
2695    *    tables, etc.
2696    * @throws IOException
2697    */
2698   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2699       final String namespace, final String regex, final List<TableName> tableNameList,
2700       final boolean includeSysTables)
2701   throws IOException {
2702     if (tableNameList == null || tableNameList.size() == 0) {
2703       // request for all TableDescriptors
2704       Collection<HTableDescriptor> allHtds;
2705       if (namespace != null && namespace.length() > 0) {
2706         // Do a check on the namespace existence. Will fail if does not exist.
2707         this.clusterSchemaService.getNamespace(namespace);
2708         allHtds = tableDescriptors.getByNamespace(namespace).values();
2709       } else {
2710         allHtds = tableDescriptors.getAll().values();
2711       }
2712       for (HTableDescriptor desc: allHtds) {
2713         if (tableStateManager.isTablePresent(desc.getTableName())
2714             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2715           htds.add(desc);
2716         }
2717       }
2718     } else {
2719       for (TableName s: tableNameList) {
2720         if (tableStateManager.isTablePresent(s)) {
2721           HTableDescriptor desc = tableDescriptors.get(s);
2722           if (desc != null) {
2723             htds.add(desc);
2724           }
2725         }
2726       }
2727     }
2728
2729     // Retains only those matched by regular expression.
2730     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2731     return htds;
2732   }
2733
2734   /**
2735    * Removes the table descriptors that don't match the pattern.
2736    * @param descriptors list of table descriptors to filter
2737    * @param pattern the regex to use
2738    */
2739   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2740       final Pattern pattern) {
2741     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2742     Iterator<HTableDescriptor> itr = descriptors.iterator();
2743     while (itr.hasNext()) {
2744       HTableDescriptor htd = itr.next();
2745       String tableName = htd.getTableName().getNameAsString();
2746       boolean matched = pattern.matcher(tableName).matches();
2747       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2748         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2749       }
2750       if (!matched) {
2751         itr.remove();
2752       }
2753     }
2754   }
2755
2756   @Override
2757   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2758     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2759   }
2760
2761   @Override
2762   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2763     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2764   }
2765
2766   /**
2767    * Gets the mob file compaction state for a specific table.
2768    * Whether all the mob files are selected is known during the compaction execution, but
2769    * the statistic is done just before compaction starts, it is hard to know the compaction
2770    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2771    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2772    * @param tableName The current table name.
2773    * @return If a given table is in mob file compaction now.
2774    */
2775   public CompactionState getMobCompactionState(TableName tableName) {
2776     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2777     if (compactionsCount != null && compactionsCount.get() != 0) {
2778       return CompactionState.MAJOR_AND_MINOR;
2779     }
2780     return CompactionState.NONE;
2781   }
2782
2783   public void reportMobCompactionStart(TableName tableName) throws IOException {
2784     IdLock.Entry lockEntry = null;
2785     try {
2786       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2787       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2788       if (compactionsCount == null) {
2789         compactionsCount = new AtomicInteger(0);
2790         mobCompactionStates.put(tableName, compactionsCount);
2791       }
2792       compactionsCount.incrementAndGet();
2793     } finally {
2794       if (lockEntry != null) {
2795         mobCompactionLock.releaseLockEntry(lockEntry);
2796       }
2797     }
2798   }
2799
2800   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2801     IdLock.Entry lockEntry = null;
2802     try {
2803       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2804       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2805       if (compactionsCount != null) {
2806         int count = compactionsCount.decrementAndGet();
2807         // remove the entry if the count is 0.
2808         if (count == 0) {
2809           mobCompactionStates.remove(tableName);
2810         }
2811       }
2812     } finally {
2813       if (lockEntry != null) {
2814         mobCompactionLock.releaseLockEntry(lockEntry);
2815       }
2816     }
2817   }
2818
2819   /**
2820    * Requests mob compaction.
2821    * @param tableName The table the compact.
2822    * @param columns The compacted columns.
2823    * @param allFiles Whether add all mob files into the compaction.
2824    */
2825   public void requestMobCompaction(TableName tableName,
2826     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2827     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2828       tableLockManager, allFiles);
2829   }
2830
2831   /**
2832    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2833    * false is returned.
2834    *
2835    * @return The state of the load balancer, or false if the load balancer isn't defined.
2836    */
2837   public boolean isBalancerOn() {
2838     if (null == loadBalancerTracker) return false;
2839     return loadBalancerTracker.isBalancerOn();
2840   }
2841
2842   /**
2843    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2844    * false is returned.
2845    */
2846   public boolean isNormalizerOn() {
2847     return null == regionNormalizerTracker? false: regionNormalizerTracker.isNormalizerOn();
2848   }
2849
2850
2851   /**
2852    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
2853    * false is returned. If switchType is illegal, false will return.
2854    * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
2855    * @return The state of the switch
2856    */
2857   public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
2858     if (null == splitOrMergeTracker) {
2859       return false;
2860     }
2861     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
2862   }
2863
2864   /**
2865    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2866    *
2867    * @return The name of the {@link LoadBalancer} in use.
2868    */
2869   public String getLoadBalancerClassName() {
2870     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2871         .getDefaultLoadBalancerClass().getName());
2872   }
2873
2874   /**
2875    * @return RegionNormalizerTracker instance
2876    */
2877   public RegionNormalizerTracker getRegionNormalizerTracker() {
2878     return regionNormalizerTracker;
2879   }
2880
2881   public SplitOrMergeTracker getSplitOrMergeTracker() {
2882     return splitOrMergeTracker;
2883   }
2884
2885   @Override
2886   public LoadBalancer getLoadBalancer() {
2887     return balancer;
2888   }
2889 }