001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.EnumSet;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Objects;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterManager.ServiceType;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Manages the interactions with an already deployed distributed cluster (as opposed to a
045 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
046 */
047@InterfaceAudience.Private
048public class DistributedHBaseCluster extends HBaseClusterInterface {
049
050  private static final Logger LOG = LoggerFactory.getLogger(DistributedHBaseCluster.class);
051
052  private Admin admin;
053  private final Connection connection;
054
055  private ClusterManager clusterManager;
056  /**
057   * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
058   * restarted instances of the same server will have different ServerName and will not coincide
059   * with past dead ones. So there's no need to cleanup this list.
060   */
061  private final Set<ServerName> killedRegionServers = new HashSet<>();
062
063  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
064    throws IOException {
065    super(conf);
066    this.clusterManager = clusterManager;
067    this.connection = ConnectionFactory.createConnection(conf);
068    this.admin = this.connection.getAdmin();
069    this.initialClusterStatus = getClusterMetrics();
070  }
071
072  public void setClusterManager(ClusterManager clusterManager) {
073    this.clusterManager = clusterManager;
074  }
075
076  public ClusterManager getClusterManager() {
077    return clusterManager;
078  }
079
080  /**
081   * Returns a ClusterStatus for this HBase cluster
082   */
083  @Override
084  public ClusterMetrics getClusterMetrics() throws IOException {
085    return admin.getClusterMetrics();
086  }
087
088  @Override
089  public ClusterMetrics getInitialClusterMetrics() throws IOException {
090    return initialClusterStatus;
091  }
092
093  @Override
094  public void close() throws IOException {
095    if (this.admin != null) {
096      admin.close();
097    }
098    if (this.connection != null && !this.connection.isClosed()) {
099      this.connection.close();
100    }
101  }
102
103  @Override
104  public void startRegionServer(String hostname, int port) throws IOException {
105    LOG.info("Starting RS on: {}", hostname);
106    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
107  }
108
109  @Override
110  public void killRegionServer(ServerName serverName) throws IOException {
111    LOG.info("Aborting RS: {}", serverName.getServerName());
112    killedRegionServers.add(serverName);
113    clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
114      serverName.getPort());
115  }
116
117  @Override
118  public boolean isKilledRS(ServerName serverName) {
119    return killedRegionServers.contains(serverName);
120  }
121
122  @Override
123  public void stopRegionServer(ServerName serverName) throws IOException {
124    LOG.info("Stopping RS: {}", serverName.getServerName());
125    clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
126      serverName.getPort());
127  }
128
129  @Override
130  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
131    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
132  }
133
134  @Override
135  public void suspendRegionServer(ServerName serverName) throws IOException {
136    LOG.info("Suspend RS: {}", serverName.getServerName());
137    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
138      serverName.getPort());
139  }
140
141  @Override
142  public void resumeRegionServer(ServerName serverName) throws IOException {
143    LOG.info("Resume RS: {}", serverName.getServerName());
144    clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
145      serverName.getPort());
146  }
147
148  @Override
149  public void startZkNode(String hostname, int port) throws IOException {
150    LOG.info("Starting ZooKeeper node on: {}", hostname);
151    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
152  }
153
154  @Override
155  public void killZkNode(ServerName serverName) throws IOException {
156    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
157    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
158      serverName.getPort());
159  }
160
161  @Override
162  public void stopZkNode(ServerName serverName) throws IOException {
163    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
164    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
165      serverName.getPort());
166  }
167
168  @Override
169  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
170    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
171  }
172
173  @Override
174  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
175    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
176  }
177
178  @Override
179  public void startDataNode(ServerName serverName) throws IOException {
180    LOG.info("Starting data node on: {}", serverName.getServerName());
181    clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
182      serverName.getPort());
183  }
184
185  @Override
186  public void killDataNode(ServerName serverName) throws IOException {
187    LOG.info("Aborting data node on: {}", serverName.getServerName());
188    clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
189      serverName.getPort());
190  }
191
192  @Override
193  public void stopDataNode(ServerName serverName) throws IOException {
194    LOG.info("Stopping data node on: {}", serverName.getServerName());
195    clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
196      serverName.getPort());
197  }
198
199  @Override
200  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
201    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
202  }
203
204  @Override
205  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
206    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
207  }
208
209  @Override
210  public void startNameNode(ServerName serverName) throws IOException {
211    LOG.info("Starting name node on: {}", serverName.getServerName());
212    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
213      serverName.getPort());
214  }
215
216  @Override
217  public void killNameNode(ServerName serverName) throws IOException {
218    LOG.info("Aborting name node on: {}", serverName.getServerName());
219    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
220      serverName.getPort());
221  }
222
223  @Override
224  public void stopNameNode(ServerName serverName) throws IOException {
225    LOG.info("Stopping name node on: {}", serverName.getServerName());
226    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
227      serverName.getPort());
228  }
229
230  @Override
231  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
232    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
233  }
234
235  @Override
236  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
237    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
238  }
239
240  @Override
241  public void startJournalNode(ServerName serverName) throws IOException {
242    LOG.info("Starting journal node on: {}", serverName.getServerName());
243    clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
244      serverName.getPort());
245  }
246
247  @Override
248  public void killJournalNode(ServerName serverName) throws IOException {
249    LOG.info("Aborting journal node on: {}", serverName.getServerName());
250    clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
251      serverName.getPort());
252  }
253
254  @Override
255  public void stopJournalNode(ServerName serverName) throws IOException {
256    LOG.info("Stopping journal node on: {}", serverName.getServerName());
257    clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
258      serverName.getPort());
259  }
260
261  @Override
262  public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException {
263    waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
264  }
265
266  @Override
267  public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException {
268    waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
269  }
270
271  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
272    throws IOException {
273    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
274    long start = EnvironmentEdgeManager.currentTime();
275
276    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
277      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
278        return;
279      }
280      Threads.sleep(100);
281    }
282    throw new IOException("Timed-out waiting for service to stop: " + serverName);
283  }
284
285  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
286    throws IOException {
287    LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName());
288    long start = EnvironmentEdgeManager.currentTime();
289
290    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
291      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
292        return;
293      }
294      Threads.sleep(100);
295    }
296    throw new IOException("Timed-out waiting for service to start: " + serverName);
297  }
298
299  @Override
300  public void startMaster(String hostname, int port) throws IOException {
301    LOG.info("Starting Master on: {}:{}", hostname, port);
302    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
303  }
304
305  @Override
306  public void killMaster(ServerName serverName) throws IOException {
307    LOG.info("Aborting Master: {}", serverName.getServerName());
308    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
309  }
310
311  @Override
312  public void stopMaster(ServerName serverName) throws IOException {
313    LOG.info("Stopping Master: {}", serverName.getServerName());
314    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
315  }
316
317  @Override
318  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
319    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
320  }
321
322  @Override
323  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
324    long start = EnvironmentEdgeManager.currentTime();
325    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
326      try {
327        connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION));
328        return true;
329      } catch (MasterNotRunningException m) {
330        LOG.warn("Master not started yet " + m);
331      } catch (ZooKeeperConnectionException e) {
332        LOG.warn("Failed to connect to ZK " + e);
333      }
334      Threads.sleep(1000);
335    }
336    return false;
337  }
338
339  @Override
340  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
341    byte[] startKey = RegionInfo.getStartKey(regionName);
342    HRegionLocation regionLoc = null;
343    try (RegionLocator locator = connection.getRegionLocator(tn)) {
344      regionLoc = locator.getRegionLocation(startKey, true);
345    }
346    if (regionLoc == null) {
347      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
348      return null;
349    }
350    return regionLoc.getServerName();
351  }
352
353  @Override
354  public void waitUntilShutDown() {
355    // Simply wait for a few seconds for now (after issuing serverManager.kill
356    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
357  }
358
359  @Override
360  public void shutdown() throws IOException {
361    // not sure we want this
362    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
363  }
364
365  @Override
366  public boolean isDistributedCluster() {
367    return true;
368  }
369
370  @Override
371  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
372    ClusterMetrics current = getClusterMetrics();
373
374    LOG.info("Restoring cluster - started");
375
376    // do a best effort restore
377    boolean success = restoreMasters(initial, current);
378    success = restoreRegionServers(initial, current) && success;
379    success = restoreAdmin() && success;
380
381    LOG.info("Restoring cluster - done");
382    return success;
383  }
384
385  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
386    List<IOException> deferred = new ArrayList<>();
387    // check whether current master has changed
388    final ServerName initMaster = initial.getMasterName();
389    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
390      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
391        initMaster.getAddress(), current.getMasterName().getAddress());
392      // If initial master is stopped, start it, before restoring the state.
393      // It will come up as a backup master, if there is already an active master.
394      try {
395        if (
396          !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(),
397            initMaster.getPort())
398        ) {
399          LOG.info("Restoring cluster - starting initial active master at:{}",
400            initMaster.getAddress());
401          startMaster(initMaster.getHostname(), initMaster.getPort());
402        }
403
404        // master has changed, we would like to undo this.
405        // 1. Kill the current backups
406        // 2. Stop current master
407        // 3. Start backup masters
408        for (ServerName currentBackup : current.getBackupMasterNames()) {
409          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
410            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
411            stopMaster(currentBackup);
412          }
413        }
414        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
415        stopMaster(current.getMasterName());
416        waitForActiveAndReadyMaster(); // wait so that active master takes over
417      } catch (IOException ex) {
418        // if we fail to start the initial active master, we do not want to continue stopping
419        // backup masters. Just keep what we have now
420        deferred.add(ex);
421      }
422
423      // start backup masters
424      for (ServerName backup : initial.getBackupMasterNames()) {
425        try {
426          // these are not started in backup mode, but we should already have an active master
427          if (
428            !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(),
429              backup.getPort())
430          ) {
431            LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress());
432            startMaster(backup.getHostname(), backup.getPort());
433          }
434        } catch (IOException ex) {
435          deferred.add(ex);
436        }
437      }
438    } else {
439      // current master has not changed, match up backup masters
440      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
441      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
442      toStart.addAll(initial.getBackupMasterNames());
443      toKill.addAll(current.getBackupMasterNames());
444
445      for (ServerName server : current.getBackupMasterNames()) {
446        toStart.remove(server);
447      }
448      for (ServerName server : initial.getBackupMasterNames()) {
449        toKill.remove(server);
450      }
451
452      for (ServerName sn : toStart) {
453        try {
454          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
455            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
456            startMaster(sn.getHostname(), sn.getPort());
457          }
458        } catch (IOException ex) {
459          deferred.add(ex);
460        }
461      }
462
463      for (ServerName sn : toKill) {
464        try {
465          if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
466            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
467            stopMaster(sn);
468          }
469        } catch (IOException ex) {
470          deferred.add(ex);
471        }
472      }
473    }
474    if (!deferred.isEmpty()) {
475      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
476      for (int i = 0; i < deferred.size() && i < 3; i++) {
477        LOG.warn(Objects.toString(deferred.get(i)));
478      }
479    }
480
481    return deferred.isEmpty();
482  }
483
484  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
485    @Override
486    public int compare(ServerName o1, ServerName o2) {
487      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
488      if (compare != 0) return compare;
489      compare = o1.getPort() - o2.getPort();
490      if (compare != 0) return compare;
491      return 0;
492    }
493  }
494
495  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
496    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
497    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
498    toStart.addAll(initial.getLiveServerMetrics().keySet());
499    toKill.addAll(current.getLiveServerMetrics().keySet());
500
501    ServerName master = initial.getMasterName();
502
503    for (ServerName server : current.getLiveServerMetrics().keySet()) {
504      toStart.remove(server);
505    }
506    for (ServerName server : initial.getLiveServerMetrics().keySet()) {
507      toKill.remove(server);
508    }
509
510    List<IOException> deferred = new ArrayList<>();
511
512    for (ServerName sn : toStart) {
513      try {
514        if (
515          !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
516            && master.getPort() != sn.getPort()
517        ) {
518          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
519          startRegionServer(sn.getHostname(), sn.getPort());
520        }
521      } catch (IOException ex) {
522        deferred.add(ex);
523      }
524    }
525
526    for (ServerName sn : toKill) {
527      try {
528        if (
529          clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
530            && master.getPort() != sn.getPort()
531        ) {
532          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
533          stopRegionServer(sn);
534        }
535      } catch (IOException ex) {
536        deferred.add(ex);
537      }
538    }
539    if (!deferred.isEmpty()) {
540      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
541      for (int i = 0; i < deferred.size() && i < 3; i++) {
542        LOG.warn(Objects.toString(deferred.get(i)));
543      }
544    }
545
546    return deferred.isEmpty();
547  }
548
549  protected boolean restoreAdmin() throws IOException {
550    // While restoring above, if the HBase Master which was initially the Active one, was down
551    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
552    // to refresh its connections (otherwise it will return incorrect information) or we can
553    // point it to new instance.
554    admin.close();
555    this.admin = this.connection.getAdmin();
556    LOG.info("Added new HBaseAdmin");
557    return true;
558  }
559}