1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Comparator;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import java.util.regex.Pattern;
40
41 import javax.servlet.ServletException;
42 import javax.servlet.http.HttpServlet;
43 import javax.servlet.http.HttpServletRequest;
44 import javax.servlet.http.HttpServletResponse;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.ClusterStatus;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.CoordinatedStateManager;
53 import org.apache.hadoop.hbase.DoNotRetryIOException;
54 import org.apache.hadoop.hbase.HBaseIOException;
55 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56 import org.apache.hadoop.hbase.HColumnDescriptor;
57 import org.apache.hadoop.hbase.HConstants;
58 import org.apache.hadoop.hbase.HRegionInfo;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.MasterNotRunningException;
61 import org.apache.hadoop.hbase.MetaMigrationConvertingToPB;
62 import org.apache.hadoop.hbase.MetaTableAccessor;
63 import org.apache.hadoop.hbase.NamespaceDescriptor;
64 import org.apache.hadoop.hbase.NamespaceNotFoundException;
65 import org.apache.hadoop.hbase.PleaseHoldException;
66 import org.apache.hadoop.hbase.ProcedureInfo;
67 import org.apache.hadoop.hbase.Server;
68 import org.apache.hadoop.hbase.ServerLoad;
69 import org.apache.hadoop.hbase.ServerName;
70 import org.apache.hadoop.hbase.TableDescriptors;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.TableNotDisabledException;
73 import org.apache.hadoop.hbase.TableNotFoundException;
74 import org.apache.hadoop.hbase.UnknownRegionException;
75 import org.apache.hadoop.hbase.classification.InterfaceAudience;
76 import org.apache.hadoop.hbase.client.MetaScanner;
77 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
78 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
79 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
80 import org.apache.hadoop.hbase.client.Result;
81 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
82 import org.apache.hadoop.hbase.exceptions.DeserializationException;
83 import org.apache.hadoop.hbase.executor.ExecutorType;
84 import org.apache.hadoop.hbase.ipc.RpcServer;
85 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
86 import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
87 import org.apache.hadoop.hbase.master.RegionState.State;
88 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
89 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
90 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
91 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
92 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
93 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
94 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
95 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
96 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
97 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
98 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
99 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
100 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
101 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
102 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
103 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
104 import org.apache.hadoop.hbase.master.handler.TruncateTableHandler;
105 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
106 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
107 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
108 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
109 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
110 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
111 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
112 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
113 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
114 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
115 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
116 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
117 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
118 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
119 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
120 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
121 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
122 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
123 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
124 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
125 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
126 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
127 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
128 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
129 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
130 import org.apache.hadoop.hbase.quotas.RegionStateListener;
131 import org.apache.hadoop.hbase.regionserver.HRegionServer;
132 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
133 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
134 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
135 import org.apache.hadoop.hbase.replication.regionserver.Replication;
136 import org.apache.hadoop.hbase.security.UserProvider;
137 import org.apache.hadoop.hbase.util.Addressing;
138 import org.apache.hadoop.hbase.util.Bytes;
139 import org.apache.hadoop.hbase.util.CompressionTest;
140 import org.apache.hadoop.hbase.util.ConfigUtil;
141 import org.apache.hadoop.hbase.util.EncryptionTest;
142 import org.apache.hadoop.hbase.util.FSUtils;
143 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
144 import org.apache.hadoop.hbase.util.HasThread;
145 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
146 import org.apache.hadoop.hbase.util.Pair;
147 import org.apache.hadoop.hbase.util.Threads;
148 import org.apache.hadoop.hbase.util.VersionInfo;
149 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
150 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
151 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
152 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
153 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
154 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
155 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
156 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
157 import org.apache.zookeeper.KeeperException;
158 import org.mortbay.jetty.Connector;
159 import org.mortbay.jetty.nio.SelectChannelConnector;
160 import org.mortbay.jetty.servlet.Context;
161
162 import com.google.common.annotations.VisibleForTesting;
163 import com.google.common.collect.Maps;
164 import com.google.protobuf.Descriptors;
165 import com.google.protobuf.Service;
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
183 @SuppressWarnings("deprecation")
184 public class HMaster extends HRegionServer implements MasterServices, Server {
185 private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
186
187
188
189
190
191 private static class InitializationMonitor extends HasThread {
192
193 public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
194 public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
195
196
197
198
199
200 public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
201 public static final boolean HALT_DEFAULT = false;
202
203 private final HMaster master;
204 private final long timeout;
205 private final boolean haltOnTimeout;
206
207
208 InitializationMonitor(HMaster master) {
209 super("MasterInitializationMonitor");
210 this.master = master;
211 this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
212 this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
213 this.setDaemon(true);
214 }
215
216 @Override
217 public void run() {
218 try {
219 while (!master.isStopped() && master.isActiveMaster()) {
220 Thread.sleep(timeout);
221 if (master.isInitialized()) {
222 LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
223 } else {
224 LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
225 + " consider submitting a bug report including a thread dump of this process.");
226 if (haltOnTimeout) {
227 LOG.error("Zombie Master exiting. Thread dump to stdout");
228 Threads.printThreadInfo(System.out, "Zombie HMaster");
229 System.exit(-1);
230 }
231 }
232 }
233 } catch (InterruptedException ie) {
234 LOG.trace("InitMonitor thread interrupted. Existing.");
235 }
236 }
237 }
238
239
240
241 public static final String MASTER = "master";
242
243
244 private final ActiveMasterManager activeMasterManager;
245
246 RegionServerTracker regionServerTracker;
247
248 private DrainingServerTracker drainingServerTracker;
249
250 LoadBalancerTracker loadBalancerTracker;
251
252
253 private TableNamespaceManager tableNamespaceManager;
254
255
256 final MetricsMaster metricsMaster;
257
258 private MasterFileSystem fileSystemManager;
259
260
261 volatile ServerManager serverManager;
262
263
264 AssignmentManager assignmentManager;
265
266
267
268
269 MemoryBoundedLogMessageBuffer rsFatals;
270
271
272 private volatile boolean isActiveMaster = false;
273
274
275
276 volatile boolean initialized = false;
277
278
279
280 volatile boolean serviceStarted = false;
281
282
283 private volatile boolean serverShutdownHandlerEnabled = false;
284
285 LoadBalancer balancer;
286 private BalancerChore balancerChore;
287 private ClusterStatusChore clusterStatusChore;
288 private ClusterStatusPublisher clusterStatusPublisherChore = null;
289
290 CatalogJanitor catalogJanitorChore;
291 private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
292 private LogCleaner logCleaner;
293 private HFileCleaner hfileCleaner;
294
295 MasterCoprocessorHost cpHost;
296
297 private final boolean preLoadTableDescriptors;
298
299
300 private long masterActiveTime;
301
302
303 private final boolean masterCheckCompression;
304
305
306 private final boolean masterCheckEncryption;
307
308
309 private enum ProcedureConf {
310 PROCEDURE_ENABLED,
311 HANDLER_USED,
312 PROCEDURE_FULLY_DISABLED,
313 }
314 private final ProcedureConf procedureConf;
315
316 Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
317
318
319 SnapshotManager snapshotManager;
320
321 MasterProcedureManagerHost mpmHost;
322
323
324 private volatile MasterQuotaManager quotaManager;
325
326 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
327 private WALProcedureStore procedureStore;
328
329
330 private volatile boolean initializationBeforeMetaAssignment = false;
331
332
333 private org.mortbay.jetty.Server masterJettyServer;
334
335 public static class RedirectServlet extends HttpServlet {
336 private static final long serialVersionUID = 2894774810058302472L;
337 private static int regionServerInfoPort;
338
339 @Override
340 public void doGet(HttpServletRequest request,
341 HttpServletResponse response) throws ServletException, IOException {
342 String redirectUrl = request.getScheme() + "://"
343 + request.getServerName() + ":" + regionServerInfoPort
344 + request.getRequestURI();
345 response.sendRedirect(redirectUrl);
346 }
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365 public HMaster(final Configuration conf, CoordinatedStateManager csm)
366 throws IOException, KeeperException, InterruptedException {
367 super(conf, csm);
368 this.rsFatals = new MemoryBoundedLogMessageBuffer(
369 conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
370
371 LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
372 ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
373
374
375 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
376
377 Replication.decorateMasterConfiguration(this.conf);
378
379
380
381 if (this.conf.get("mapreduce.task.attempt.id") == null) {
382 this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
383 }
384
385
386 this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
387
388
389 this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
390
391 this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
392
393
394
395
396 String procedureConfString = conf.get("hbase.master.procedure.tableddl", "enabled");
397 if (procedureConfString.equalsIgnoreCase("disabled")) {
398 LOG.info("Master will use handler for new table DDL"
399 + " and all unfinished table DDLs in procedure store will be disgarded.");
400 this.procedureConf = ProcedureConf.PROCEDURE_FULLY_DISABLED;
401 } else if (procedureConfString.equalsIgnoreCase("unused")) {
402 LOG.info("Master will use handler for new table DDL"
403 + " and all unfinished table DDLs in procedure store will continue to execute.");
404 this.procedureConf = ProcedureConf.HANDLER_USED;
405 } else {
406 this.procedureConf = ProcedureConf.PROCEDURE_ENABLED;
407 }
408
409 this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
410
411
412
413 boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
414 HConstants.STATUS_PUBLISHED_DEFAULT);
415 Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
416 conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
417 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
418 ClusterStatusPublisher.Publisher.class);
419
420 if (shouldPublish) {
421 if (publisherClass == null) {
422 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
423 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
424 " is not set - not publishing status");
425 } else {
426 clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
427 getChoreService().scheduleChore(clusterStatusPublisherChore);
428 }
429 }
430
431
432 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
433 activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
434 int infoPort = putUpJettyServer();
435 startActiveMasterManager(infoPort);
436 } else {
437 activeMasterManager = null;
438 }
439 }
440
441
442 private int putUpJettyServer() throws IOException {
443 if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
444 return -1;
445 }
446 int infoPort = conf.getInt("hbase.master.info.port.orig",
447 HConstants.DEFAULT_MASTER_INFOPORT);
448
449 if (infoPort < 0 || infoServer == null) {
450 return -1;
451 }
452 String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
453 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
454 String msg =
455 "Failed to start redirecting jetty server. Address " + addr
456 + " does not belong to this host. Correct configuration parameter: "
457 + "hbase.master.info.bindAddress";
458 LOG.error(msg);
459 throw new IOException(msg);
460 }
461
462 RedirectServlet.regionServerInfoPort = infoServer.getPort();
463 if(RedirectServlet.regionServerInfoPort == infoPort) {
464 return infoPort;
465 }
466 masterJettyServer = new org.mortbay.jetty.Server();
467 Connector connector = new SelectChannelConnector();
468 connector.setHost(addr);
469 connector.setPort(infoPort);
470 masterJettyServer.addConnector(connector);
471 masterJettyServer.setStopAtShutdown(true);
472 Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
473 context.addServlet(RedirectServlet.class, "/*");
474 try {
475 masterJettyServer.start();
476 } catch (Exception e) {
477 throw new IOException("Failed to start redirecting jetty server", e);
478 }
479 return connector.getLocalPort();
480 }
481
482
483
484
485 @Override
486 protected void login(UserProvider user, String host) throws IOException {
487 try {
488 super.login(user, host);
489 } catch (IOException ie) {
490 user.login("hbase.master.keytab.file",
491 "hbase.master.kerberos.principal", host);
492 }
493 }
494
495
496
497
498
499
500 @Override
501 protected void waitForMasterActive(){
502 boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
503 while (!(tablesOnMaster && isActiveMaster)
504 && !isStopped() && !isAborted()) {
505 sleeper.sleep();
506 }
507 }
508
509 @VisibleForTesting
510 public MasterRpcServices getMasterRpcServices() {
511 return (MasterRpcServices)rpcServices;
512 }
513
514 public boolean balanceSwitch(final boolean b) throws IOException {
515 return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
516 }
517
518 @Override
519 protected String getProcessName() {
520 return MASTER;
521 }
522
523 @Override
524 protected boolean canCreateBaseZNode() {
525 return true;
526 }
527
528 @Override
529 protected boolean canUpdateTableDescriptor() {
530 return true;
531 }
532
533 @Override
534 protected RSRpcServices createRpcServices() throws IOException {
535 return new MasterRpcServices(this);
536 }
537
538 @Override
539 protected void configureInfoServer() {
540 infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
541 infoServer.setAttribute(MASTER, this);
542 if (BaseLoadBalancer.tablesOnMaster(conf)) {
543 super.configureInfoServer();
544 }
545 }
546
547 @Override
548 protected Class<? extends HttpServlet> getDumpServlet() {
549 return MasterDumpServlet.class;
550 }
551
552
553
554
555
556 @Override
557 protected void doMetrics() {
558 try {
559 if (assignmentManager != null) {
560 assignmentManager.updateRegionsInTransitionMetrics();
561 }
562 } catch (Throwable e) {
563 LOG.error("Couldn't update metrics: " + e.getMessage());
564 }
565 }
566
567 MetricsMaster getMasterMetrics() {
568 return metricsMaster;
569 }
570
571
572
573
574
575
576
577
578 void initializeZKBasedSystemTrackers() throws IOException,
579 InterruptedException, KeeperException, CoordinatedStateException {
580 this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
581 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
582 this.loadBalancerTracker.start();
583 this.assignmentManager = new AssignmentManager(this, serverManager,
584 this.balancer, this.service, this.metricsMaster,
585 this.tableLockManager);
586 zooKeeper.registerListenerFirst(assignmentManager);
587
588 this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
589 this.serverManager);
590 this.regionServerTracker.start();
591
592 this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
593 this.serverManager);
594 this.drainingServerTracker.start();
595
596
597
598 boolean wasUp = this.clusterStatusTracker.isClusterUp();
599 if (!wasUp) this.clusterStatusTracker.setClusterUp();
600
601 LOG.info("Server active/primary master=" + this.serverName +
602 ", sessionid=0x" +
603 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
604 ", setting cluster-up flag (Was=" + wasUp + ")");
605
606
607 this.snapshotManager = new SnapshotManager();
608 this.mpmHost = new MasterProcedureManagerHost();
609 this.mpmHost.register(this.snapshotManager);
610 this.mpmHost.register(new MasterFlushTableProcedureManager());
611 this.mpmHost.loadProcedures(conf);
612 this.mpmHost.initialize(this, this.metricsMaster);
613 }
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635 private void finishActiveMasterInitialization(MonitoredTask status)
636 throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
637
638 isActiveMaster = true;
639 Thread zombieDetector = new Thread(new InitializationMonitor(this));
640 zombieDetector.start();
641
642
643
644
645
646
647
648 status.setStatus("Initializing Master file system");
649
650 this.masterActiveTime = System.currentTimeMillis();
651
652 this.fileSystemManager = new MasterFileSystem(this, this);
653
654
655 this.tableDescriptors.setCacheOn();
656
657 this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
658 conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
659
660 if (preLoadTableDescriptors) {
661 status.setStatus("Pre-loading table descriptors");
662 this.tableDescriptors.getAll();
663 }
664
665
666 status.setStatus("Publishing Cluster ID in ZooKeeper");
667 ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
668 this.serverManager = createServerManager(this, this);
669
670 setupClusterConnection();
671
672
673 this.tableLockManager.reapWriteLocks();
674
675 status.setStatus("Initializing ZK system trackers");
676 initializeZKBasedSystemTrackers();
677
678
679 status.setStatus("Initializing master coprocessors");
680 this.cpHost = new MasterCoprocessorHost(this, this.conf);
681
682
683 status.setStatus("Initializing master service threads");
684 startServiceThreads();
685
686
687 sleeper.skipSleepCycle();
688
689
690 this.serverManager.waitForRegionServers(status);
691
692 for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
693
694 if (!this.serverManager.isServerOnline(sn)
695 && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
696 LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
697 }
698 }
699
700
701
702
703 Set<ServerName> previouslyFailedServers = this.fileSystemManager
704 .getFailedServersFromLogFolders();
705
706
707 this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
708
709
710 ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
711 if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
712 splitMetaLogBeforeAssignment(oldMetaServerLocation);
713
714
715 }
716 Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
717
718
719
720
721
722
723
724
725 previouslyFailedMetaRSs.addAll(previouslyFailedServers);
726
727 this.initializationBeforeMetaAssignment = true;
728
729
730 if (BaseLoadBalancer.tablesOnMaster(conf)) {
731 waitForServerOnline();
732 }
733
734
735 this.balancer.setClusterStatus(getClusterStatus());
736 this.balancer.setMasterServices(this);
737 this.balancer.initialize();
738
739
740
741 if(isStopped()) return;
742
743
744 status.setStatus("Assigning Meta Region");
745 assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
746
747
748 if(isStopped()) return;
749
750 status.setStatus("Submitting log splitting work for previously failed region servers");
751
752
753 for (ServerName tmpServer : previouslyFailedServers) {
754 this.serverManager.processDeadServer(tmpServer, true);
755 }
756
757
758
759
760 if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
761 MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
762 }
763
764
765 status.setStatus("Starting assignment manager");
766 this.assignmentManager.joinCluster();
767
768
769 this.balancer.setClusterStatus(getClusterStatus());
770
771
772
773 status.setStatus("Starting balancer and catalog janitor");
774 this.clusterStatusChore = new ClusterStatusChore(this, balancer);
775 getChoreService().scheduleChore(clusterStatusChore);
776 this.balancerChore = new BalancerChore(this);
777 getChoreService().scheduleChore(balancerChore);
778 this.catalogJanitorChore = new CatalogJanitor(this, this);
779 getChoreService().scheduleChore(catalogJanitorChore);
780
781 status.setStatus("Starting namespace manager");
782 initNamespace();
783
784 if (this.cpHost != null) {
785 try {
786 this.cpHost.preMasterInitialization();
787 } catch (IOException e) {
788 LOG.error("Coprocessor preMasterInitialization() hook failed", e);
789 }
790 }
791
792 status.markComplete("Initialization successful");
793 LOG.info("Master has completed initialization");
794 configurationManager.registerObserver(this.balancer);
795 initialized = true;
796
797 status.setStatus("Starting quota manager");
798 initQuotaManager();
799
800
801 Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
802 int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
803 HConstants.DEFAULT_META_REPLICA_NUM);
804 for (int i = 1; i < numReplicas; i++) {
805 assignMeta(status, EMPTY_SET, i);
806 }
807 unassignExcessMetaReplica(zooKeeper, numReplicas);
808
809
810
811
812 this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
813
814
815 status.setStatus("Checking ZNode ACLs");
816 zooKeeper.checkAndSetZNodeAcls();
817
818 status.setStatus("Calling postStartMaster coprocessors");
819 if (this.cpHost != null) {
820
821 try {
822 this.cpHost.postStartMaster();
823 } catch (IOException ioe) {
824 LOG.error("Coprocessor postStartMaster() hook failed", ioe);
825 }
826 }
827
828 zombieDetector.interrupt();
829 }
830
831 private void initQuotaManager() throws IOException {
832 quotaManager = new MasterQuotaManager(this);
833 this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
834 quotaManager.start();
835 }
836
837
838
839
840
841
842
843
844
845 ServerManager createServerManager(final Server master,
846 final MasterServices services)
847 throws IOException {
848
849
850 return new ServerManager(master, services);
851 }
852
853 private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
854
855
856 try {
857 List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
858 for (String metaReplicaZnode : metaReplicaZnodes) {
859 int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
860 if (replicaId >= numMetaReplicasConfigured) {
861 RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
862 LOG.info("Closing excess replica of meta region " + r.getRegion());
863
864 ServerManager.closeRegionSilentlyAndWait(getConnection(), r.getServerName(),
865 r.getRegion(), 30000);
866 ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
867 }
868 }
869 } catch (Exception ex) {
870
871
872 LOG.warn("Ignoring exception " + ex);
873 }
874 }
875
876
877
878
879
880
881
882
883
884
885 void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
886 throws InterruptedException, IOException, KeeperException {
887
888 int assigned = 0;
889 long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
890 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
891 status.setStatus("Assigning hbase:meta region");
892 } else {
893 status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
894 }
895
896 RegionStates regionStates = assignmentManager.getRegionStates();
897 RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
898 HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
899 replicaId);
900 ServerName currentMetaServer = metaState.getServerName();
901 if (!ConfigUtil.useZKForAssignment(conf)) {
902 regionStates.createRegionState(hri, metaState.getState(),
903 currentMetaServer, null);
904 } else {
905 regionStates.createRegionState(hri);
906 }
907 boolean rit = this.assignmentManager.
908 processRegionInTransitionAndBlockUntilAssigned(hri);
909 boolean metaRegionLocation = metaTableLocator.verifyMetaRegionLocation(
910 this.getConnection(), this.getZooKeeper(), timeout, replicaId);
911 if (!metaRegionLocation || !metaState.isOpened()) {
912
913
914 assigned++;
915 if (!ConfigUtil.useZKForAssignment(conf)) {
916 assignMetaZkLess(regionStates, metaState, timeout, previouslyFailedMetaRSs);
917 } else if (!rit) {
918
919 if (currentMetaServer != null) {
920
921
922
923
924
925
926
927 if (serverManager.isServerOnline(currentMetaServer)) {
928 LOG.info("Forcing expire of " + currentMetaServer);
929 serverManager.expireServer(currentMetaServer);
930 }
931 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
932 splitMetaLogBeforeAssignment(currentMetaServer);
933 previouslyFailedMetaRSs.add(currentMetaServer);
934 }
935 }
936 assignmentManager.assignMeta(hri);
937 }
938 } else {
939
940 regionStates.updateRegionState(
941 HRegionInfo.FIRST_META_REGIONINFO, State.OPEN, currentMetaServer);
942 this.assignmentManager.regionOnline(
943 HRegionInfo.FIRST_META_REGIONINFO, currentMetaServer);
944 }
945
946 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableMeta(TableName.META_TABLE_NAME);
947
948 if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
949 && (!previouslyFailedMetaRSs.isEmpty())) {
950
951 status.setStatus("replaying log for Meta Region");
952 this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
953 }
954
955
956
957
958
959 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableServerShutdownHandler(assigned != 0);
960 LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", rit=" + rit +
961 ", location=" + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
962 status.setStatus("META assigned.");
963 }
964
965 private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
966 Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
967 ServerName currentServer = regionState.getServerName();
968 if (serverManager.isServerOnline(currentServer)) {
969 LOG.info("Meta was in transition on " + currentServer);
970 assignmentManager.processRegionInTransitionZkLess();
971 } else {
972 if (currentServer != null) {
973 if (regionState.getRegion().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
974 splitMetaLogBeforeAssignment(currentServer);
975 regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
976 previouslyFailedRs.add(currentServer);
977 }
978 }
979 LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
980 regionStates.updateRegionState(regionState.getRegion(), State.OFFLINE);
981 assignmentManager.assignMeta(regionState.getRegion());
982 }
983 }
984
985 void initNamespace() throws IOException {
986
987 tableNamespaceManager = new TableNamespaceManager(this);
988 tableNamespaceManager.start();
989 }
990
991 boolean isCatalogJanitorEnabled() {
992 return catalogJanitorChore != null ?
993 catalogJanitorChore.getEnabled() : false;
994 }
995
996 private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
997 if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
998
999 Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1000 regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1001 this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1002 } else {
1003
1004 this.fileSystemManager.splitMetaLog(currentMetaServer);
1005 }
1006 }
1007
1008 private void enableServerShutdownHandler(
1009 final boolean waitForMeta) throws IOException, InterruptedException {
1010
1011
1012
1013
1014
1015 if (!serverShutdownHandlerEnabled) {
1016 serverShutdownHandlerEnabled = true;
1017 this.serverManager.processQueuedDeadServers();
1018 }
1019
1020 if (waitForMeta) {
1021 metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1022
1023
1024 this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1025 }
1026 }
1027
1028 private void enableMeta(TableName metaTableName) {
1029 if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
1030 ZooKeeperProtos.Table.State.ENABLED)) {
1031 this.assignmentManager.setEnabledTable(metaTableName);
1032 }
1033 }
1034
1035
1036
1037
1038
1039
1040 private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1041 Set<ServerName> result = new HashSet<ServerName>();
1042 String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1043 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1044 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1045 if (regionFailedServers == null) return result;
1046
1047 for(String failedServer : regionFailedServers) {
1048 ServerName server = ServerName.parseServerName(failedServer);
1049 result.add(server);
1050 }
1051 return result;
1052 }
1053
1054 @Override
1055 public TableDescriptors getTableDescriptors() {
1056 return this.tableDescriptors;
1057 }
1058
1059 @Override
1060 public ServerManager getServerManager() {
1061 return this.serverManager;
1062 }
1063
1064 @Override
1065 public MasterFileSystem getMasterFileSystem() {
1066 return this.fileSystemManager;
1067 }
1068
1069
1070
1071
1072
1073
1074
1075
1076 private void startServiceThreads() throws IOException{
1077
1078 this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1079 conf.getInt("hbase.master.executor.openregion.threads", 5));
1080 this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1081 conf.getInt("hbase.master.executor.closeregion.threads", 5));
1082 this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1083 conf.getInt("hbase.master.executor.serverops.threads", 5));
1084 this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1085 conf.getInt("hbase.master.executor.serverops.threads", 5));
1086 this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1087 conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1088
1089
1090
1091
1092
1093
1094 this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1095 startProcedureExecutor();
1096
1097
1098 int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1099 this.logCleaner =
1100 new LogCleaner(cleanerInterval,
1101 this, conf, getMasterFileSystem().getFileSystem(),
1102 getMasterFileSystem().getOldLogDir());
1103 getChoreService().scheduleChore(logCleaner);
1104
1105
1106 Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1107 this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1108 .getFileSystem(), archiveDir);
1109 getChoreService().scheduleChore(hfileCleaner);
1110 serviceStarted = true;
1111 if (LOG.isTraceEnabled()) {
1112 LOG.trace("Started service threads");
1113 }
1114 if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
1115 try {
1116 replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
1117 cleanerInterval, this.getZooKeeper(), this.conf);
1118 getChoreService().scheduleChore(replicationZKLockCleanerChore);
1119 } catch (Exception e) {
1120 LOG.error("start replicationZKLockCleanerChore failed", e);
1121 }
1122 }
1123 }
1124
1125 @Override
1126 protected void sendShutdownInterrupt() {
1127 super.sendShutdownInterrupt();
1128 stopProcedureExecutor();
1129 }
1130
1131 @Override
1132 protected void stopServiceThreads() {
1133 if (masterJettyServer != null) {
1134 LOG.info("Stopping master jetty server");
1135 try {
1136 masterJettyServer.stop();
1137 } catch (Exception e) {
1138 LOG.error("Failed to stop master jetty server", e);
1139 }
1140 }
1141 super.stopServiceThreads();
1142 stopChores();
1143
1144
1145
1146 if (!isAborted() && this.serverManager != null &&
1147 this.serverManager.isClusterShutdown()) {
1148 this.serverManager.letRegionServersShutdown();
1149 }
1150 if (LOG.isDebugEnabled()) {
1151 LOG.debug("Stopping service threads");
1152 }
1153
1154 if (this.logCleaner != null) this.logCleaner.cancel(true);
1155 if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1156 if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
1157 if (this.quotaManager != null) this.quotaManager.stop();
1158 if (this.activeMasterManager != null) this.activeMasterManager.stop();
1159 if (this.serverManager != null) this.serverManager.stop();
1160 if (this.assignmentManager != null) this.assignmentManager.stop();
1161 if (this.fileSystemManager != null) this.fileSystemManager.stop();
1162 if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1163 }
1164
1165
1166
1167
1168 @Override
1169 public boolean isMasterProcedureExecutorEnabled() {
1170 return (this.procedureConf == ProcedureConf.PROCEDURE_ENABLED);
1171 }
1172
1173 private void startProcedureExecutor() throws IOException {
1174 final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1175 final Path logDir = new Path(fileSystemManager.getRootDir(),
1176 MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1177
1178 if (this.procedureConf == ProcedureConf.PROCEDURE_FULLY_DISABLED) {
1179
1180
1181
1182 try {
1183 fs.delete(logDir, true);
1184 LOG.warn("Procedure executor is disabled from configuartion. " +
1185 "All the state logs from procedure store were removed." +
1186 "You should check the cluster state using HBCK.");
1187 } catch (Exception e) {
1188
1189 LOG.error("Removing all the state logs from procedure store failed." +
1190 "You should check the cluster state using HBCK.");
1191 }
1192 return;
1193 }
1194
1195 procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1196 new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1197
1198 procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1199 procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1200 procEnv.getProcedureQueue());
1201
1202 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1203 Math.max(Runtime.getRuntime().availableProcessors(),
1204 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1205 procedureStore.start(numThreads);
1206 procedureExecutor.start(numThreads);
1207 }
1208
1209 private void stopProcedureExecutor() {
1210 if (procedureExecutor != null) {
1211 procedureExecutor.stop();
1212 }
1213
1214 if (procedureStore != null) {
1215 procedureStore.stop(isAborted());
1216 }
1217 }
1218
1219 private void stopChores() {
1220 if (this.balancerChore != null) {
1221 this.balancerChore.cancel(true);
1222 }
1223 if (this.clusterStatusChore != null) {
1224 this.clusterStatusChore.cancel(true);
1225 }
1226 if (this.catalogJanitorChore != null) {
1227 this.catalogJanitorChore.cancel(true);
1228 }
1229 if (this.clusterStatusPublisherChore != null){
1230 clusterStatusPublisherChore.cancel(true);
1231 }
1232 }
1233
1234
1235
1236
1237
1238 InetAddress getRemoteInetAddress(final int port,
1239 final long serverStartCode) throws UnknownHostException {
1240
1241
1242 InetAddress ia = RpcServer.getRemoteIp();
1243
1244
1245
1246 if (ia == null && serverStartCode == startcode) {
1247 InetSocketAddress isa = rpcServices.getSocketAddress();
1248 if (isa != null && isa.getPort() == port) {
1249 ia = isa.getAddress();
1250 }
1251 }
1252 return ia;
1253 }
1254
1255
1256
1257
1258 private int getBalancerCutoffTime() {
1259 int balancerCutoffTime =
1260 getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1261 if (balancerCutoffTime == -1) {
1262
1263 int balancerPeriod =
1264 getConfiguration().getInt("hbase.balancer.period", 300000);
1265 balancerCutoffTime = balancerPeriod;
1266
1267 if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1268 }
1269 return balancerCutoffTime;
1270 }
1271
1272 public boolean balance() throws IOException {
1273
1274 if (!this.initialized) {
1275 LOG.debug("Master has not been initialized, don't run balancer.");
1276 return false;
1277 }
1278
1279 int maximumBalanceTime = getBalancerCutoffTime();
1280 synchronized (this.balancer) {
1281
1282 if (!this.loadBalancerTracker.isBalancerOn()) return false;
1283
1284 if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1285 Map<String, RegionState> regionsInTransition =
1286 this.assignmentManager.getRegionStates().getRegionsInTransition();
1287 LOG.debug("Not running balancer because " + regionsInTransition.size() +
1288 " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1289 abbreviate(regionsInTransition.toString(), 256));
1290 return false;
1291 }
1292 if (this.serverManager.areDeadServersInProgress()) {
1293 LOG.debug("Not running balancer because processing dead regionserver(s): " +
1294 this.serverManager.getDeadServers());
1295 return false;
1296 }
1297
1298 if (this.cpHost != null) {
1299 try {
1300 if (this.cpHost.preBalance()) {
1301 LOG.debug("Coprocessor bypassing balancer request");
1302 return false;
1303 }
1304 } catch (IOException ioe) {
1305 LOG.error("Error invoking master coprocessor preBalance()", ioe);
1306 return false;
1307 }
1308 }
1309
1310 Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1311 this.assignmentManager.getRegionStates().getAssignmentsByTable();
1312
1313 List<RegionPlan> plans = new ArrayList<RegionPlan>();
1314
1315 this.balancer.setClusterStatus(getClusterStatus());
1316 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1317 List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1318 if (partialPlans != null) plans.addAll(partialPlans);
1319 }
1320 long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1321 int rpCount = 0;
1322 long totalRegPlanExecTime = 0;
1323 if (plans != null && !plans.isEmpty()) {
1324 for (RegionPlan plan: plans) {
1325 LOG.info("balance " + plan);
1326 long balStartTime = System.currentTimeMillis();
1327
1328 this.assignmentManager.balance(plan);
1329 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1330 rpCount++;
1331 if (rpCount < plans.size() &&
1332
1333 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1334
1335 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1336 maximumBalanceTime);
1337 break;
1338 }
1339 }
1340 }
1341 if (this.cpHost != null) {
1342 try {
1343 this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1344 } catch (IOException ioe) {
1345
1346 LOG.error("Error invoking master coprocessor postBalance()", ioe);
1347 }
1348 }
1349 }
1350
1351
1352 return true;
1353 }
1354
1355
1356
1357
1358 String getClientIdAuditPrefix() {
1359 return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1360 }
1361
1362
1363
1364
1365
1366
1367
1368 public void setCatalogJanitorEnabled(final boolean b) {
1369 this.catalogJanitorChore.setEnabled(b);
1370 }
1371
1372 @Override
1373 public void dispatchMergingRegions(final HRegionInfo region_a,
1374 final HRegionInfo region_b, final boolean forcible) throws IOException {
1375 checkInitialized();
1376 this.service.submit(new DispatchMergingRegionHandler(this,
1377 this.catalogJanitorChore, region_a, region_b, forcible));
1378 }
1379
1380 void move(final byte[] encodedRegionName,
1381 final byte[] destServerName) throws HBaseIOException {
1382 RegionState regionState = assignmentManager.getRegionStates().
1383 getRegionState(Bytes.toString(encodedRegionName));
1384 if (regionState == null) {
1385 throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1386 }
1387
1388 HRegionInfo hri = regionState.getRegion();
1389 ServerName dest;
1390 if (destServerName == null || destServerName.length == 0) {
1391 LOG.info("Passed destination servername is null/empty so " +
1392 "choosing a server at random");
1393 final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1394 regionState.getServerName());
1395 dest = balancer.randomAssignment(hri, destServers);
1396 if (dest == null) {
1397 LOG.debug("Unable to determine a plan to assign " + hri);
1398 return;
1399 }
1400 } else {
1401 dest = ServerName.valueOf(Bytes.toString(destServerName));
1402 if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1403 && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1404
1405
1406
1407 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1408 + " to avoid unnecessary region moving later by load balancer,"
1409 + " because it should not be on master");
1410 return;
1411 }
1412 }
1413
1414 if (dest.equals(regionState.getServerName())) {
1415 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1416 + " because region already assigned to the same server " + dest + ".");
1417 return;
1418 }
1419
1420
1421 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1422
1423 try {
1424 checkInitialized();
1425 if (this.cpHost != null) {
1426 if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1427 return;
1428 }
1429 }
1430
1431
1432
1433 serverManager.sendRegionWarmup(rp.getDestination(), hri);
1434
1435 LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1436 this.assignmentManager.balance(rp);
1437 if (this.cpHost != null) {
1438 this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1439 }
1440 } catch (IOException ioe) {
1441 if (ioe instanceof HBaseIOException) {
1442 throw (HBaseIOException)ioe;
1443 }
1444 throw new HBaseIOException(ioe);
1445 }
1446 }
1447
1448 @Override
1449 public long createTable(
1450 final HTableDescriptor hTableDescriptor,
1451 final byte [][] splitKeys,
1452 final long nonceGroup,
1453 final long nonce) throws IOException {
1454 if (isStopped()) {
1455 throw new MasterNotRunningException();
1456 }
1457
1458 TableName tableName = hTableDescriptor.getTableName();
1459 String namespace = tableName.getNamespaceAsString();
1460 ensureNamespaceExists(namespace);
1461
1462 HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1463 checkInitialized();
1464 sanityCheckTableDescriptor(hTableDescriptor);
1465 if (cpHost != null) {
1466 cpHost.preCreateTable(hTableDescriptor, newRegions);
1467 }
1468 LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1469
1470 long procId = -1;
1471 if (isMasterProcedureExecutorEnabled()) {
1472
1473
1474 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1475 procId = this.procedureExecutor.submitProcedure(
1476 new CreateTableProcedure(
1477 procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1478 nonceGroup,
1479 nonce);
1480 latch.await();
1481 } else {
1482 try {
1483 this.quotaManager.checkNamespaceTableAndRegionQuota(tableName, newRegions.length);
1484 this.service.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor,
1485 conf, newRegions, this).prepare());
1486 } catch (IOException e) {
1487 this.quotaManager.removeTableFromNamespaceQuota(tableName);
1488 LOG.error("Exception occurred while creating the table " + tableName.getNameAsString(), e);
1489 throw e;
1490 }
1491 }
1492
1493 if (cpHost != null) {
1494 cpHost.postCreateTable(hTableDescriptor, newRegions);
1495 }
1496
1497 return procId;
1498 }
1499
1500
1501
1502
1503
1504
1505 private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1506 final String CONF_KEY = "hbase.table.sanity.checks";
1507 boolean logWarn = false;
1508 if (!conf.getBoolean(CONF_KEY, true)) {
1509 logWarn = true;
1510 }
1511 String tableVal = htd.getConfigurationValue(CONF_KEY);
1512 if (tableVal != null && !Boolean.valueOf(tableVal)) {
1513 logWarn = true;
1514 }
1515
1516
1517 long maxFileSizeLowerLimit = 2 * 1024 * 1024L;
1518 long maxFileSize = htd.getMaxFileSize();
1519 if (maxFileSize < 0) {
1520 maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1521 }
1522 if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1523 String message = "MAX_FILESIZE for table descriptor or "
1524 + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1525 + ") is too small, which might cause over splitting into unmanageable "
1526 + "number of regions.";
1527 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1528 }
1529
1530
1531 long flushSizeLowerLimit = 1024 * 1024L;
1532 long flushSize = htd.getMemStoreFlushSize();
1533 if (flushSize < 0) {
1534 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1535 }
1536 if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1537 String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1538 + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1539 + " very frequent flushing.";
1540 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1541 }
1542
1543
1544 try {
1545 checkClassLoading(conf, htd);
1546 } catch (Exception ex) {
1547 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1548 }
1549
1550
1551 try {
1552 checkCompression(htd);
1553 } catch (IOException e) {
1554 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1555 }
1556
1557
1558 try {
1559 checkEncryption(conf, htd);
1560 } catch (IOException e) {
1561 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1562 }
1563
1564
1565 if (htd.getColumnFamilies().length == 0) {
1566 String message = "Table should have at least one column family.";
1567 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1568 }
1569
1570 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1571 if (hcd.getTimeToLive() <= 0) {
1572 String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1573 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1574 }
1575
1576
1577 if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1578 String message = "Block size for column family " + hcd.getNameAsString()
1579 + " must be between 1K and 16MB.";
1580 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1581 }
1582
1583
1584 if (hcd.getMinVersions() < 0) {
1585 String message = "Min versions for column family " + hcd.getNameAsString()
1586 + " must be positive.";
1587 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1588 }
1589
1590
1591
1592
1593
1594 if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1595 String message = "Min versions for column family " + hcd.getNameAsString()
1596 + " must be less than the Max versions.";
1597 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1598 }
1599
1600
1601 if (hcd.getScope() < 0) {
1602 String message = "Replication scope for column family "
1603 + hcd.getNameAsString() + " must be positive.";
1604 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1605 }
1606
1607
1608 }
1609 }
1610
1611
1612 private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1613 String message, Exception cause) throws IOException {
1614 if (!logWarn) {
1615 throw new DoNotRetryIOException(message + " Set " + confKey +
1616 " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1617 }
1618 LOG.warn(message);
1619 }
1620
1621 private void startActiveMasterManager(int infoPort) throws KeeperException {
1622 String backupZNode = ZKUtil.joinZNode(
1623 zooKeeper.backupMasterAddressesZNode, serverName.toString());
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634 LOG.info("Adding backup master ZNode " + backupZNode);
1635 if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1636 serverName, infoPort)) {
1637 LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1638 }
1639
1640 activeMasterManager.setInfoPort(infoPort);
1641
1642 Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1643 @Override
1644 public void run() {
1645 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1646 HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1647
1648 if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1649 HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1650 LOG.debug("HMaster started in backup mode. "
1651 + "Stalling until master znode is written.");
1652
1653
1654 while (!activeMasterManager.hasActiveMaster()) {
1655 LOG.debug("Waiting for master address ZNode to be written "
1656 + "(Also watching cluster state node)");
1657 Threads.sleep(timeout);
1658 }
1659 }
1660 MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1661 status.setDescription("Master startup");
1662 try {
1663 if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1664 finishActiveMasterInitialization(status);
1665 }
1666 } catch (Throwable t) {
1667 status.setStatus("Failed to become active: " + t.getMessage());
1668 LOG.fatal("Failed to become active master", t);
1669
1670 if (t instanceof NoClassDefFoundError &&
1671 t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
1672
1673 abort("HBase is having a problem with its Hadoop jars. You may need to "
1674 + "recompile HBase against Hadoop version "
1675 + org.apache.hadoop.util.VersionInfo.getVersion()
1676 + " or change your hadoop jars to start properly", t);
1677 } else {
1678 abort("Unhandled exception. Starting shutdown.", t);
1679 }
1680 } finally {
1681 status.cleanup();
1682 }
1683 }
1684 }, getServerName().toShortString() + ".activeMasterManager"));
1685 }
1686
1687 private void checkCompression(final HTableDescriptor htd)
1688 throws IOException {
1689 if (!this.masterCheckCompression) return;
1690 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1691 checkCompression(hcd);
1692 }
1693 }
1694
1695 private void checkCompression(final HColumnDescriptor hcd)
1696 throws IOException {
1697 if (!this.masterCheckCompression) return;
1698 CompressionTest.testCompression(hcd.getCompression());
1699 CompressionTest.testCompression(hcd.getCompactionCompression());
1700 }
1701
1702 private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1703 throws IOException {
1704 if (!this.masterCheckEncryption) return;
1705 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1706 checkEncryption(conf, hcd);
1707 }
1708 }
1709
1710 private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1711 throws IOException {
1712 if (!this.masterCheckEncryption) return;
1713 EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1714 }
1715
1716 private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1717 throws IOException {
1718 RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1719 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1720 }
1721
1722 private static boolean isCatalogTable(final TableName tableName) {
1723 return tableName.equals(TableName.META_TABLE_NAME);
1724 }
1725
1726 @Override
1727 public long deleteTable(
1728 final TableName tableName,
1729 final long nonceGroup,
1730 final long nonce) throws IOException {
1731 checkInitialized();
1732 if (cpHost != null) {
1733 cpHost.preDeleteTable(tableName);
1734 }
1735 LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1736
1737 long procId = -1;
1738 if (isMasterProcedureExecutorEnabled()) {
1739
1740 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1741 procId = this.procedureExecutor.submitProcedure(
1742 new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1743 nonceGroup,
1744 nonce);
1745 latch.await();
1746 } else {
1747 this.service.submit(new DeleteTableHandler(tableName, this, this).prepare());
1748 }
1749
1750 if (cpHost != null) {
1751 cpHost.postDeleteTable(tableName);
1752 }
1753
1754 return procId;
1755 }
1756
1757 @Override
1758 public void truncateTable(
1759 final TableName tableName,
1760 final boolean preserveSplits,
1761 final long nonceGroup,
1762 final long nonce) throws IOException {
1763 checkInitialized();
1764 if (cpHost != null) {
1765 cpHost.preTruncateTable(tableName);
1766 }
1767 LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1768
1769 if (isMasterProcedureExecutorEnabled()) {
1770 long procId = this.procedureExecutor.submitProcedure(
1771 new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1772 nonceGroup,
1773 nonce);
1774 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1775 } else {
1776 TruncateTableHandler handler =
1777 new TruncateTableHandler(tableName, this, this, preserveSplits);
1778 handler.prepare();
1779 handler.process();
1780 }
1781
1782 if (cpHost != null) {
1783 cpHost.postTruncateTable(tableName);
1784 }
1785 }
1786
1787 @Override
1788 public void addColumn(
1789 final TableName tableName,
1790 final HColumnDescriptor columnDescriptor,
1791 final long nonceGroup,
1792 final long nonce)
1793 throws IOException {
1794 checkInitialized();
1795 checkCompression(columnDescriptor);
1796 checkEncryption(conf, columnDescriptor);
1797 if (cpHost != null) {
1798 if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1799 return;
1800 }
1801 }
1802
1803 LOG.info(getClientIdAuditPrefix() + " add " + columnDescriptor);
1804
1805 if (isMasterProcedureExecutorEnabled()) {
1806
1807 long procId = this.procedureExecutor.submitProcedure(
1808 new AddColumnFamilyProcedure(
1809 procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1810 nonceGroup,
1811 nonce);
1812 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1813 } else {
1814 new TableAddFamilyHandler(tableName, columnDescriptor, this, this).prepare().process();
1815 }
1816 if (cpHost != null) {
1817 cpHost.postAddColumn(tableName, columnDescriptor);
1818 }
1819 }
1820
1821 @Override
1822 public void modifyColumn(
1823 final TableName tableName,
1824 final HColumnDescriptor descriptor,
1825 final long nonceGroup,
1826 final long nonce)
1827 throws IOException {
1828 checkInitialized();
1829 checkCompression(descriptor);
1830 checkEncryption(conf, descriptor);
1831 if (cpHost != null) {
1832 if (cpHost.preModifyColumn(tableName, descriptor)) {
1833 return;
1834 }
1835 }
1836 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1837
1838 if (isMasterProcedureExecutorEnabled()) {
1839
1840 long procId = this.procedureExecutor.submitProcedure(
1841 new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1842 nonceGroup,
1843 nonce);
1844 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1845 } else {
1846 new TableModifyFamilyHandler(tableName, descriptor, this, this).prepare().process();
1847 }
1848
1849 if (cpHost != null) {
1850 cpHost.postModifyColumn(tableName, descriptor);
1851 }
1852 }
1853
1854 @Override
1855 public void deleteColumn(
1856 final TableName tableName,
1857 final byte[] columnName,
1858 final long nonceGroup,
1859 final long nonce)
1860 throws IOException {
1861 checkInitialized();
1862 if (cpHost != null) {
1863 if (cpHost.preDeleteColumn(tableName, columnName)) {
1864 return;
1865 }
1866 }
1867 LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1868
1869 if (isMasterProcedureExecutorEnabled()) {
1870
1871 long procId = this.procedureExecutor.submitProcedure(
1872 new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1873 nonceGroup,
1874 nonce);
1875 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1876 } else {
1877 new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
1878 }
1879
1880 if (cpHost != null) {
1881 cpHost.postDeleteColumn(tableName, columnName);
1882 }
1883 }
1884
1885 @Override
1886 public long enableTable(
1887 final TableName tableName,
1888 final long nonceGroup,
1889 final long nonce) throws IOException {
1890 checkInitialized();
1891 if (cpHost != null) {
1892 cpHost.preEnableTable(tableName);
1893 }
1894 LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1895
1896 long procId = -1;
1897 if (isMasterProcedureExecutorEnabled()) {
1898
1899 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1900 procId = this.procedureExecutor.submitProcedure(
1901 new EnableTableProcedure(
1902 procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1903 nonceGroup,
1904 nonce);
1905 this.procedureExecutor.submitProcedure(new EnableTableProcedure(procedureExecutor
1906 .getEnvironment(), tableName, false, prepareLatch));
1907
1908
1909
1910
1911 prepareLatch.await();
1912 } else {
1913 this.service.submit(new EnableTableHandler(this, tableName,
1914 assignmentManager, tableLockManager, false).prepare());
1915 }
1916
1917 if (cpHost != null) {
1918 cpHost.postEnableTable(tableName);
1919 }
1920
1921 return procId;
1922 }
1923
1924 @Override
1925 public long disableTable(
1926 final TableName tableName,
1927 final long nonceGroup,
1928 final long nonce) throws IOException {
1929 checkInitialized();
1930 if (cpHost != null) {
1931 cpHost.preDisableTable(tableName);
1932 }
1933 LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1934
1935 long procId = -1;
1936 if (isMasterProcedureExecutorEnabled()) {
1937
1938 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1939 procId = this.procedureExecutor.submitProcedure(
1940 new DisableTableProcedure(
1941 procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1942 nonceGroup,
1943 nonce);
1944
1945
1946
1947
1948 prepareLatch.await();
1949 } else {
1950 this.service.submit(new DisableTableHandler(this, tableName,
1951 assignmentManager, tableLockManager, false).prepare());
1952 }
1953
1954 if (cpHost != null) {
1955 cpHost.postDisableTable(tableName);
1956 }
1957
1958 return procId;
1959 }
1960
1961
1962
1963
1964
1965
1966
1967 @VisibleForTesting
1968 Pair<HRegionInfo, ServerName> getTableRegionForRow(
1969 final TableName tableName, final byte [] rowKey)
1970 throws IOException {
1971 final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1972 new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1973
1974 MetaScannerVisitor visitor =
1975 new MetaScannerVisitorBase() {
1976 @Override
1977 public boolean processRow(Result data) throws IOException {
1978 if (data == null || data.size() <= 0) {
1979 return true;
1980 }
1981 Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
1982 if (pair == null) {
1983 return false;
1984 }
1985 if (!pair.getFirst().getTable().equals(tableName)) {
1986 return false;
1987 }
1988 result.set(pair);
1989 return true;
1990 }
1991 };
1992
1993 MetaScanner.metaScan(clusterConnection, visitor, tableName, rowKey, 1);
1994 return result.get();
1995 }
1996
1997 @Override
1998 public void modifyTable(
1999 final TableName tableName,
2000 final HTableDescriptor descriptor,
2001 final long nonceGroup,
2002 final long nonce)
2003 throws IOException {
2004 checkInitialized();
2005 sanityCheckTableDescriptor(descriptor);
2006 if (cpHost != null) {
2007 cpHost.preModifyTable(tableName, descriptor);
2008 }
2009
2010 LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2011
2012 if (isMasterProcedureExecutorEnabled()) {
2013
2014 long procId = this.procedureExecutor.submitProcedure(
2015 new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2016 nonceGroup,
2017 nonce);
2018 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2019 } else {
2020 new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
2021 }
2022
2023 if (cpHost != null) {
2024 cpHost.postModifyTable(tableName, descriptor);
2025 }
2026 }
2027
2028 @Override
2029 public void checkTableModifiable(final TableName tableName)
2030 throws IOException, TableNotFoundException, TableNotDisabledException {
2031 if (isCatalogTable(tableName)) {
2032 throw new IOException("Can't modify catalog tables");
2033 }
2034 if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2035 throw new TableNotFoundException(tableName);
2036 }
2037 if (!getAssignmentManager().getTableStateManager().
2038 isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
2039 throw new TableNotDisabledException(tableName);
2040 }
2041 }
2042
2043
2044
2045
2046 public ClusterStatus getClusterStatus() throws InterruptedIOException {
2047
2048 List<String> backupMasterStrings;
2049 try {
2050 backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2051 this.zooKeeper.backupMasterAddressesZNode);
2052 } catch (KeeperException e) {
2053 LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2054 backupMasterStrings = null;
2055 }
2056
2057 List<ServerName> backupMasters = null;
2058 if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2059 backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2060 for (String s: backupMasterStrings) {
2061 try {
2062 byte [] bytes;
2063 try {
2064 bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2065 this.zooKeeper.backupMasterAddressesZNode, s));
2066 } catch (InterruptedException e) {
2067 throw new InterruptedIOException();
2068 }
2069 if (bytes != null) {
2070 ServerName sn;
2071 try {
2072 sn = ServerName.parseFrom(bytes);
2073 } catch (DeserializationException e) {
2074 LOG.warn("Failed parse, skipping registering backup server", e);
2075 continue;
2076 }
2077 backupMasters.add(sn);
2078 }
2079 } catch (KeeperException e) {
2080 LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2081 "backup servers"), e);
2082 }
2083 }
2084 Collections.sort(backupMasters, new Comparator<ServerName>() {
2085 @Override
2086 public int compare(ServerName s1, ServerName s2) {
2087 return s1.getServerName().compareTo(s2.getServerName());
2088 }});
2089 }
2090
2091 String clusterId = fileSystemManager != null ?
2092 fileSystemManager.getClusterId().toString() : null;
2093 Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2094 assignmentManager.getRegionStates().getRegionsInTransition() : null;
2095 String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2096 boolean balancerOn = loadBalancerTracker != null ?
2097 loadBalancerTracker.isBalancerOn() : false;
2098 Map<ServerName, ServerLoad> onlineServers = null;
2099 Set<ServerName> deadServers = null;
2100 if (serverManager != null) {
2101 deadServers = serverManager.getDeadServers().copyServerNames();
2102 onlineServers = serverManager.getOnlineServers();
2103 }
2104 return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2105 onlineServers, deadServers, serverName, backupMasters,
2106 regionsInTransition, coprocessors, balancerOn);
2107 }
2108
2109
2110
2111
2112
2113
2114
2115
2116 public static String getLoadedCoprocessors() {
2117 return CoprocessorHost.getLoadedCoprocessors().toString();
2118 }
2119
2120
2121
2122
2123 public long getMasterStartTime() {
2124 return startcode;
2125 }
2126
2127
2128
2129
2130 public long getMasterActiveTime() {
2131 return masterActiveTime;
2132 }
2133
2134 public int getRegionServerInfoPort(final ServerName sn) {
2135 RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2136 if (info == null || info.getInfoPort() == 0) {
2137 return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2138 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2139 }
2140 return info.getInfoPort();
2141 }
2142
2143
2144
2145
2146 public String[] getMasterCoprocessors() {
2147 Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2148 return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2149 }
2150
2151 @Override
2152 public void abort(final String msg, final Throwable t) {
2153 if (isAborted() || isStopped()) {
2154 return;
2155 }
2156 if (cpHost != null) {
2157
2158 LOG.fatal("Master server abort: loaded coprocessors are: " +
2159 getLoadedCoprocessors());
2160 }
2161 if (t != null) LOG.fatal(msg, t);
2162 stop(msg);
2163 }
2164
2165 @Override
2166 public ZooKeeperWatcher getZooKeeper() {
2167 return zooKeeper;
2168 }
2169
2170 @Override
2171 public MasterCoprocessorHost getMasterCoprocessorHost() {
2172 return cpHost;
2173 }
2174
2175 @Override
2176 public MasterQuotaManager getMasterQuotaManager() {
2177 return quotaManager;
2178 }
2179
2180 @Override
2181 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2182 return procedureExecutor;
2183 }
2184
2185 @Override
2186 public ServerName getServerName() {
2187 return this.serverName;
2188 }
2189
2190 @Override
2191 public AssignmentManager getAssignmentManager() {
2192 return this.assignmentManager;
2193 }
2194
2195 public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2196 return rsFatals;
2197 }
2198
2199 public void shutdown() {
2200 if (cpHost != null) {
2201 try {
2202 cpHost.preShutdown();
2203 } catch (IOException ioe) {
2204 LOG.error("Error call master coprocessor preShutdown()", ioe);
2205 }
2206 }
2207
2208 if (this.serverManager != null) {
2209 this.serverManager.shutdownCluster();
2210 }
2211 if (this.clusterStatusTracker != null){
2212 try {
2213 this.clusterStatusTracker.setClusterDown();
2214 } catch (KeeperException e) {
2215 LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2216 }
2217 }
2218 }
2219
2220 public void stopMaster() {
2221 if (cpHost != null) {
2222 try {
2223 cpHost.preStopMaster();
2224 } catch (IOException ioe) {
2225 LOG.error("Error call master coprocessor preStopMaster()", ioe);
2226 }
2227 }
2228 stop("Stopped by " + Thread.currentThread().getName());
2229 }
2230
2231 void checkServiceStarted() throws ServerNotRunningYetException {
2232 if (!serviceStarted) {
2233 throw new ServerNotRunningYetException("Server is not running yet");
2234 }
2235 }
2236
2237 void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2238 checkServiceStarted();
2239 if (!this.initialized) {
2240 throw new PleaseHoldException("Master is initializing");
2241 }
2242 }
2243
2244 void checkNamespaceManagerReady() throws IOException {
2245 checkInitialized();
2246 if (tableNamespaceManager == null ||
2247 !tableNamespaceManager.isTableAvailableAndInitialized()) {
2248 throw new IOException("Table Namespace Manager not ready yet, try again later");
2249 }
2250 }
2251
2252
2253
2254
2255
2256
2257
2258
2259 public boolean isActiveMaster() {
2260 return isActiveMaster;
2261 }
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272 @Override
2273 public boolean isInitialized() {
2274 return initialized;
2275 }
2276
2277
2278
2279
2280
2281
2282 @Override
2283 public boolean isServerShutdownHandlerEnabled() {
2284 return this.serverShutdownHandlerEnabled;
2285 }
2286
2287
2288
2289
2290
2291 public boolean isInitializationStartsMetaRegionAssignment() {
2292 return this.initializationBeforeMetaAssignment;
2293 }
2294
2295 public void assignRegion(HRegionInfo hri) {
2296 assignmentManager.assign(hri, true);
2297 }
2298
2299
2300
2301
2302
2303
2304
2305 public double getAverageLoad() {
2306 if (this.assignmentManager == null) {
2307 return 0;
2308 }
2309
2310 RegionStates regionStates = this.assignmentManager.getRegionStates();
2311 if (regionStates == null) {
2312 return 0;
2313 }
2314 return regionStates.getAverageLoad();
2315 }
2316
2317 @Override
2318 public boolean registerService(Service instance) {
2319
2320
2321
2322 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2323 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2324 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2325 " already registered, rejecting request from "+instance
2326 );
2327 return false;
2328 }
2329
2330 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2331 if (LOG.isDebugEnabled()) {
2332 LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2333 }
2334 return true;
2335 }
2336
2337
2338
2339
2340
2341
2342
2343 public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2344 final Configuration conf, final CoordinatedStateManager cp) {
2345 try {
2346 Constructor<? extends HMaster> c =
2347 masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2348 return c.newInstance(conf, cp);
2349 } catch (InvocationTargetException ite) {
2350 Throwable target = ite.getTargetException() != null?
2351 ite.getTargetException(): ite;
2352 if (target.getCause() != null) target = target.getCause();
2353 throw new RuntimeException("Failed construction of Master: " +
2354 masterClass.toString(), target);
2355 } catch (Exception e) {
2356 throw new RuntimeException("Failed construction of Master: " +
2357 masterClass.toString() + ((e.getCause() != null)?
2358 e.getCause().getMessage(): ""), e);
2359 }
2360 }
2361
2362
2363
2364
2365 public static void main(String [] args) {
2366 VersionInfo.logVersion();
2367 new HMasterCommandLine(HMaster.class).doMain(args);
2368 }
2369
2370 public HFileCleaner getHFileCleaner() {
2371 return this.hfileCleaner;
2372 }
2373
2374
2375
2376
2377
2378 public SnapshotManager getSnapshotManagerForTesting() {
2379 return this.snapshotManager;
2380 }
2381
2382 @Override
2383 public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
2384 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2385 checkNamespaceManagerReady();
2386 if (cpHost != null) {
2387 if (cpHost.preCreateNamespace(descriptor)) {
2388 return;
2389 }
2390 }
2391 LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
2392 tableNamespaceManager.create(descriptor);
2393 if (cpHost != null) {
2394 cpHost.postCreateNamespace(descriptor);
2395 }
2396 }
2397
2398 @Override
2399 public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
2400 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2401 checkNamespaceManagerReady();
2402 if (cpHost != null) {
2403 if (cpHost.preModifyNamespace(descriptor)) {
2404 return;
2405 }
2406 }
2407 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2408 tableNamespaceManager.update(descriptor);
2409 if (cpHost != null) {
2410 cpHost.postModifyNamespace(descriptor);
2411 }
2412 }
2413
2414 @Override
2415 public void deleteNamespace(String name) throws IOException {
2416 checkNamespaceManagerReady();
2417 if (cpHost != null) {
2418 if (cpHost.preDeleteNamespace(name)) {
2419 return;
2420 }
2421 }
2422 LOG.info(getClientIdAuditPrefix() + " delete " + name);
2423 tableNamespaceManager.remove(name);
2424 if (cpHost != null) {
2425 cpHost.postDeleteNamespace(name);
2426 }
2427 }
2428
2429
2430
2431
2432
2433
2434
2435
2436 protected void ensureNamespaceExists(final String name)
2437 throws IOException, NamespaceNotFoundException {
2438 checkNamespaceManagerReady();
2439 NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2440 if (nsd == null) {
2441 throw new NamespaceNotFoundException(name);
2442 }
2443 }
2444
2445 @Override
2446 public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
2447 checkNamespaceManagerReady();
2448
2449 if (cpHost != null) {
2450 cpHost.preGetNamespaceDescriptor(name);
2451 }
2452
2453 NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2454 if (nsd == null) {
2455 throw new NamespaceNotFoundException(name);
2456 }
2457
2458 if (cpHost != null) {
2459 cpHost.postGetNamespaceDescriptor(nsd);
2460 }
2461
2462 return nsd;
2463 }
2464
2465 @Override
2466 public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
2467 checkNamespaceManagerReady();
2468
2469 final List<NamespaceDescriptor> descriptors = new ArrayList<NamespaceDescriptor>();
2470 boolean bypass = false;
2471 if (cpHost != null) {
2472 bypass = cpHost.preListNamespaceDescriptors(descriptors);
2473 }
2474
2475 if (!bypass) {
2476 descriptors.addAll(tableNamespaceManager.list());
2477
2478 if (cpHost != null) {
2479 cpHost.postListNamespaceDescriptors(descriptors);
2480 }
2481 }
2482 return descriptors;
2483 }
2484
2485 @Override
2486 public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2487 throws IOException {
2488 if (cpHost != null) {
2489 cpHost.preAbortProcedure(this.procedureExecutor, procId);
2490 }
2491
2492 final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2493
2494 if (cpHost != null) {
2495 cpHost.postAbortProcedure();
2496 }
2497
2498 return result;
2499 }
2500
2501 @Override
2502 public List<ProcedureInfo> listProcedures() throws IOException {
2503 if (cpHost != null) {
2504 cpHost.preListProcedures();
2505 }
2506
2507 final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2508
2509 if (cpHost != null) {
2510 cpHost.postListProcedures(procInfoList);
2511 }
2512
2513 return procInfoList;
2514 }
2515
2516 @Override
2517 public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2518 ensureNamespaceExists(name);
2519 return listTableDescriptors(name, null, null, true);
2520 }
2521
2522 @Override
2523 public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2524 ensureNamespaceExists(name);
2525 return listTableNames(name, null, true);
2526 }
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537 public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2538 final List<TableName> tableNameList, final boolean includeSysTables)
2539 throws IOException {
2540 final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2541
2542 boolean bypass = false;
2543 if (cpHost != null) {
2544 bypass = cpHost.preGetTableDescriptors(tableNameList, descriptors);
2545
2546 bypass |= cpHost.preGetTableDescriptors(tableNameList, descriptors, regex);
2547 }
2548
2549 if (!bypass) {
2550 if (tableNameList == null || tableNameList.size() == 0) {
2551
2552 Collection<HTableDescriptor> htds;
2553 if (namespace != null && namespace.length() > 0) {
2554 htds = tableDescriptors.getByNamespace(namespace).values();
2555 } else {
2556 htds = tableDescriptors.getAll().values();
2557 }
2558
2559 for (HTableDescriptor desc: htds) {
2560 if (includeSysTables || !desc.getTableName().isSystemTable()) {
2561 descriptors.add(desc);
2562 }
2563 }
2564 } else {
2565 for (TableName s: tableNameList) {
2566 HTableDescriptor desc = tableDescriptors.get(s);
2567 if (desc != null) {
2568 descriptors.add(desc);
2569 }
2570 }
2571 }
2572
2573
2574 if (regex != null) {
2575 filterTablesByRegex(descriptors, Pattern.compile(regex));
2576 }
2577
2578 if (cpHost != null) {
2579 cpHost.postGetTableDescriptors(descriptors);
2580
2581 cpHost.postGetTableDescriptors(tableNameList, descriptors, regex);
2582 }
2583 }
2584 return descriptors;
2585 }
2586
2587
2588
2589
2590
2591
2592
2593
2594 public List<TableName> listTableNames(final String namespace, final String regex,
2595 final boolean includeSysTables) throws IOException {
2596 final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2597
2598 boolean bypass = false;
2599 if (cpHost != null) {
2600 bypass = cpHost.preGetTableNames(descriptors, regex);
2601 }
2602
2603 if (!bypass) {
2604
2605 Collection<HTableDescriptor> htds;
2606 if (namespace != null && namespace.length() > 0) {
2607 htds = tableDescriptors.getByNamespace(namespace).values();
2608 } else {
2609 htds = tableDescriptors.getAll().values();
2610 }
2611
2612 for (HTableDescriptor htd: htds) {
2613 if (includeSysTables || !htd.getTableName().isSystemTable()) {
2614 descriptors.add(htd);
2615 }
2616 }
2617
2618
2619 if (regex != null) {
2620 filterTablesByRegex(descriptors, Pattern.compile(regex));
2621 }
2622
2623 if (cpHost != null) {
2624 cpHost.postGetTableNames(descriptors, regex);
2625 }
2626 }
2627
2628 List<TableName> result = new ArrayList<TableName>(descriptors.size());
2629 for (HTableDescriptor htd: descriptors) {
2630 result.add(htd.getTableName());
2631 }
2632 return result;
2633 }
2634
2635
2636
2637
2638
2639
2640
2641 private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2642 final Pattern pattern) {
2643 final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2644 Iterator<HTableDescriptor> itr = descriptors.iterator();
2645 while (itr.hasNext()) {
2646 HTableDescriptor htd = itr.next();
2647 String tableName = htd.getTableName().getNameAsString();
2648 boolean matched = pattern.matcher(tableName).matches();
2649 if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2650 matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2651 }
2652 if (!matched) {
2653 itr.remove();
2654 }
2655 }
2656 }
2657
2658 @Override
2659 public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2660 return getClusterStatus().getLastMajorCompactionTsForTable(table);
2661 }
2662
2663 @Override
2664 public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2665 return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2666 }
2667
2668
2669
2670
2671
2672
2673
2674 public boolean isBalancerOn() {
2675 if (null == loadBalancerTracker) return false;
2676 return loadBalancerTracker.isBalancerOn();
2677 }
2678
2679
2680
2681
2682
2683
2684 public String getLoadBalancerClassName() {
2685 return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2686 .getDefaultLoadBalancerClass().getName());
2687 }
2688 }