001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.EnumSet;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Objects;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterManager.ServiceType;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Manages the interactions with an already deployed distributed cluster (as opposed to a
045 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
046 */
047@InterfaceAudience.Private
048public class DistributedHBaseCluster extends HBaseClusterInterface {
049
050  private static final Logger LOG = LoggerFactory.getLogger(DistributedHBaseCluster.class);
051
052  private Admin admin;
053  private final Connection connection;
054
055  private ClusterManager clusterManager;
056  /**
057   * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
058   * restarted instances of the same server will have different ServerName and will not coincide
059   * with past dead ones. So there's no need to cleanup this list.
060   */
061  private Set<ServerName> killedRegionServers = new HashSet<>();
062
063  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
064    throws IOException {
065    super(conf);
066    this.clusterManager = clusterManager;
067    this.connection = ConnectionFactory.createConnection(conf);
068    this.admin = this.connection.getAdmin();
069    this.initialClusterStatus = getClusterMetrics();
070  }
071
072  public void setClusterManager(ClusterManager clusterManager) {
073    this.clusterManager = clusterManager;
074  }
075
076  public ClusterManager getClusterManager() {
077    return clusterManager;
078  }
079
080  /**
081   * Returns a ClusterStatus for this HBase cluster n
082   */
083  @Override
084  public ClusterMetrics getClusterMetrics() throws IOException {
085    return admin.getClusterMetrics();
086  }
087
088  @Override
089  public ClusterMetrics getInitialClusterMetrics() throws IOException {
090    return initialClusterStatus;
091  }
092
093  @Override
094  public void close() throws IOException {
095    if (this.admin != null) {
096      admin.close();
097    }
098    if (this.connection != null && !this.connection.isClosed()) {
099      this.connection.close();
100    }
101  }
102
103  @Override
104  public void startRegionServer(String hostname, int port) throws IOException {
105    LOG.info("Starting RS on: {}", hostname);
106    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
107  }
108
109  @Override
110  public void killRegionServer(ServerName serverName) throws IOException {
111    LOG.info("Aborting RS: {}", serverName.getServerName());
112    killedRegionServers.add(serverName);
113    clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
114      serverName.getPort());
115  }
116
117  @Override
118  public boolean isKilledRS(ServerName serverName) {
119    return killedRegionServers.contains(serverName);
120  }
121
122  @Override
123  public void stopRegionServer(ServerName serverName) throws IOException {
124    LOG.info("Stopping RS: {}", serverName.getServerName());
125    clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
126      serverName.getPort());
127  }
128
129  @Override
130  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
131    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
132  }
133
134  @Override
135  public void suspendRegionServer(ServerName serverName) throws IOException {
136    LOG.info("Suspend RS: {}", serverName.getServerName());
137    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
138      serverName.getPort());
139  }
140
141  @Override
142  public void resumeRegionServer(ServerName serverName) throws IOException {
143    LOG.info("Resume RS: {}", serverName.getServerName());
144    clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
145      serverName.getPort());
146  }
147
148  @Override
149  public void startZkNode(String hostname, int port) throws IOException {
150    LOG.info("Starting ZooKeeper node on: {}", hostname);
151    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
152  }
153
154  @Override
155  public void killZkNode(ServerName serverName) throws IOException {
156    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
157    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
158      serverName.getPort());
159  }
160
161  @Override
162  public void stopZkNode(ServerName serverName) throws IOException {
163    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
164    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
165      serverName.getPort());
166  }
167
168  @Override
169  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
170    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
171  }
172
173  @Override
174  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
175    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
176  }
177
178  @Override
179  public void startDataNode(ServerName serverName) throws IOException {
180    LOG.info("Starting data node on: {}", serverName.getServerName());
181    clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
182      serverName.getPort());
183  }
184
185  @Override
186  public void killDataNode(ServerName serverName) throws IOException {
187    LOG.info("Aborting data node on: {}", serverName.getServerName());
188    clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
189      serverName.getPort());
190  }
191
192  @Override
193  public void stopDataNode(ServerName serverName) throws IOException {
194    LOG.info("Stopping data node on: {}", serverName.getServerName());
195    clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
196      serverName.getPort());
197  }
198
199  @Override
200  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
201    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
202  }
203
204  @Override
205  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
206    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
207  }
208
209  @Override
210  public void startNameNode(ServerName serverName) throws IOException {
211    LOG.info("Starting name node on: {}", serverName.getServerName());
212    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
213      serverName.getPort());
214  }
215
216  @Override
217  public void killNameNode(ServerName serverName) throws IOException {
218    LOG.info("Aborting name node on: {}", serverName.getServerName());
219    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
220      serverName.getPort());
221  }
222
223  @Override
224  public void stopNameNode(ServerName serverName) throws IOException {
225    LOG.info("Stopping name node on: {}", serverName.getServerName());
226    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
227      serverName.getPort());
228  }
229
230  @Override
231  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
232    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
233  }
234
235  @Override
236  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
237    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
238  }
239
240  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
241    throws IOException {
242    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
243    long start = EnvironmentEdgeManager.currentTime();
244
245    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
246      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
247        return;
248      }
249      Threads.sleep(100);
250    }
251    throw new IOException("Timed-out waiting for service to stop: " + serverName);
252  }
253
254  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
255    throws IOException {
256    LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName());
257    long start = EnvironmentEdgeManager.currentTime();
258
259    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
260      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
261        return;
262      }
263      Threads.sleep(100);
264    }
265    throw new IOException("Timed-out waiting for service to start: " + serverName);
266  }
267
268  @Override
269  public void startMaster(String hostname, int port) throws IOException {
270    LOG.info("Starting Master on: {}:{}", hostname, port);
271    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
272  }
273
274  @Override
275  public void killMaster(ServerName serverName) throws IOException {
276    LOG.info("Aborting Master: {}", serverName.getServerName());
277    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
278  }
279
280  @Override
281  public void stopMaster(ServerName serverName) throws IOException {
282    LOG.info("Stopping Master: {}", serverName.getServerName());
283    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
284  }
285
286  @Override
287  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
288    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
289  }
290
291  @Override
292  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
293    long start = EnvironmentEdgeManager.currentTime();
294    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
295      try {
296        connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION));
297        return true;
298      } catch (MasterNotRunningException m) {
299        LOG.warn("Master not started yet " + m);
300      } catch (ZooKeeperConnectionException e) {
301        LOG.warn("Failed to connect to ZK " + e);
302      }
303      Threads.sleep(1000);
304    }
305    return false;
306  }
307
308  @Override
309  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
310    byte[] startKey = RegionInfo.getStartKey(regionName);
311    HRegionLocation regionLoc = null;
312    try (RegionLocator locator = connection.getRegionLocator(tn)) {
313      regionLoc = locator.getRegionLocation(startKey, true);
314    }
315    if (regionLoc == null) {
316      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
317      return null;
318    }
319    return regionLoc.getServerName();
320  }
321
322  @Override
323  public void waitUntilShutDown() {
324    // Simply wait for a few seconds for now (after issuing serverManager.kill
325    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
326  }
327
328  @Override
329  public void shutdown() throws IOException {
330    // not sure we want this
331    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
332  }
333
334  @Override
335  public boolean isDistributedCluster() {
336    return true;
337  }
338
339  @Override
340  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
341    ClusterMetrics current = getClusterMetrics();
342
343    LOG.info("Restoring cluster - started");
344
345    // do a best effort restore
346    boolean success = true;
347    success = restoreMasters(initial, current) & success;
348    success = restoreRegionServers(initial, current) & success;
349    success = restoreAdmin() & success;
350
351    LOG.info("Restoring cluster - done");
352    return success;
353  }
354
355  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
356    List<IOException> deferred = new ArrayList<>();
357    // check whether current master has changed
358    final ServerName initMaster = initial.getMasterName();
359    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
360      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
361        initMaster.getAddress(), current.getMasterName().getAddress());
362      // If initial master is stopped, start it, before restoring the state.
363      // It will come up as a backup master, if there is already an active master.
364      try {
365        if (
366          !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(),
367            initMaster.getPort())
368        ) {
369          LOG.info("Restoring cluster - starting initial active master at:{}",
370            initMaster.getAddress());
371          startMaster(initMaster.getHostname(), initMaster.getPort());
372        }
373
374        // master has changed, we would like to undo this.
375        // 1. Kill the current backups
376        // 2. Stop current master
377        // 3. Start backup masters
378        for (ServerName currentBackup : current.getBackupMasterNames()) {
379          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
380            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
381            stopMaster(currentBackup);
382          }
383        }
384        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
385        stopMaster(current.getMasterName());
386        waitForActiveAndReadyMaster(); // wait so that active master takes over
387      } catch (IOException ex) {
388        // if we fail to start the initial active master, we do not want to continue stopping
389        // backup masters. Just keep what we have now
390        deferred.add(ex);
391      }
392
393      // start backup masters
394      for (ServerName backup : initial.getBackupMasterNames()) {
395        try {
396          // these are not started in backup mode, but we should already have an active master
397          if (
398            !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(),
399              backup.getPort())
400          ) {
401            LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress());
402            startMaster(backup.getHostname(), backup.getPort());
403          }
404        } catch (IOException ex) {
405          deferred.add(ex);
406        }
407      }
408    } else {
409      // current master has not changed, match up backup masters
410      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
411      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
412      toStart.addAll(initial.getBackupMasterNames());
413      toKill.addAll(current.getBackupMasterNames());
414
415      for (ServerName server : current.getBackupMasterNames()) {
416        toStart.remove(server);
417      }
418      for (ServerName server : initial.getBackupMasterNames()) {
419        toKill.remove(server);
420      }
421
422      for (ServerName sn : toStart) {
423        try {
424          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
425            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
426            startMaster(sn.getHostname(), sn.getPort());
427          }
428        } catch (IOException ex) {
429          deferred.add(ex);
430        }
431      }
432
433      for (ServerName sn : toKill) {
434        try {
435          if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
436            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
437            stopMaster(sn);
438          }
439        } catch (IOException ex) {
440          deferred.add(ex);
441        }
442      }
443    }
444    if (!deferred.isEmpty()) {
445      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
446      for (int i = 0; i < deferred.size() && i < 3; i++) {
447        LOG.warn(Objects.toString(deferred.get(i)));
448      }
449    }
450
451    return deferred.isEmpty();
452  }
453
454  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
455    @Override
456    public int compare(ServerName o1, ServerName o2) {
457      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
458      if (compare != 0) return compare;
459      compare = o1.getPort() - o2.getPort();
460      if (compare != 0) return compare;
461      return 0;
462    }
463  }
464
465  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
466    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
467    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
468    toStart.addAll(initial.getLiveServerMetrics().keySet());
469    toKill.addAll(current.getLiveServerMetrics().keySet());
470
471    ServerName master = initial.getMasterName();
472
473    for (ServerName server : current.getLiveServerMetrics().keySet()) {
474      toStart.remove(server);
475    }
476    for (ServerName server : initial.getLiveServerMetrics().keySet()) {
477      toKill.remove(server);
478    }
479
480    List<IOException> deferred = new ArrayList<>();
481
482    for (ServerName sn : toStart) {
483      try {
484        if (
485          !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
486            && master.getPort() != sn.getPort()
487        ) {
488          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
489          startRegionServer(sn.getHostname(), sn.getPort());
490        }
491      } catch (IOException ex) {
492        deferred.add(ex);
493      }
494    }
495
496    for (ServerName sn : toKill) {
497      try {
498        if (
499          clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
500            && master.getPort() != sn.getPort()
501        ) {
502          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
503          stopRegionServer(sn);
504        }
505      } catch (IOException ex) {
506        deferred.add(ex);
507      }
508    }
509    if (!deferred.isEmpty()) {
510      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
511      for (int i = 0; i < deferred.size() && i < 3; i++) {
512        LOG.warn(Objects.toString(deferred.get(i)));
513      }
514    }
515
516    return deferred.isEmpty();
517  }
518
519  protected boolean restoreAdmin() throws IOException {
520    // While restoring above, if the HBase Master which was initially the Active one, was down
521    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
522    // to refresh its connections (otherwise it will return incorrect information) or we can
523    // point it to new instance.
524    admin.close();
525    this.admin = this.connection.getAdmin();
526    LOG.info("Added new HBaseAdmin");
527    return true;
528  }
529}