001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Objects;
026import java.util.Set;
027import java.util.TreeSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterManager.ServiceType;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ClusterConnection;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
044
045/**
046 * Manages the interactions with an already deployed distributed cluster (as opposed to a
047 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
048 */
049@InterfaceAudience.Private
050public class DistributedHBaseCluster extends HBaseCluster {
051  private Admin admin;
052  private final Connection connection;
053
054  private ClusterManager clusterManager;
055  /**
056   * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
057   * restarted instances of the same server will have different ServerName and will not coincide
058   * with past dead ones. So there's no need to cleanup this list.
059   */
060  private Set<ServerName> killedRegionServers = new HashSet<>();
061
062  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
063    throws IOException {
064    super(conf);
065    this.clusterManager = clusterManager;
066    this.connection = ConnectionFactory.createConnection(conf);
067    this.admin = this.connection.getAdmin();
068    this.initialClusterStatus = getClusterMetrics();
069  }
070
071  public void setClusterManager(ClusterManager clusterManager) {
072    this.clusterManager = clusterManager;
073  }
074
075  public ClusterManager getClusterManager() {
076    return clusterManager;
077  }
078
079  /**
080   * Returns a ClusterStatus for this HBase cluster n
081   */
082  @Override
083  public ClusterMetrics getClusterMetrics() throws IOException {
084    return admin.getClusterMetrics();
085  }
086
087  @Override
088  public ClusterMetrics getInitialClusterMetrics() throws IOException {
089    return initialClusterStatus;
090  }
091
092  @Override
093  public void close() throws IOException {
094    if (this.admin != null) {
095      admin.close();
096    }
097    if (this.connection != null && !this.connection.isClosed()) {
098      this.connection.close();
099    }
100  }
101
102  @Override
103  public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
104    throws IOException {
105    return ((ClusterConnection) this.connection).getAdmin(serverName);
106  }
107
108  @Override
109  public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
110    throws IOException {
111    return ((ClusterConnection) this.connection).getClient(serverName);
112  }
113
114  @Override
115  public void startRegionServer(String hostname, int port) throws IOException {
116    LOG.info("Starting RS on: {}", hostname);
117    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
118  }
119
120  @Override
121  public void killRegionServer(ServerName serverName) throws IOException {
122    LOG.info("Aborting RS: {}", serverName.getServerName());
123    killedRegionServers.add(serverName);
124    clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
125      serverName.getPort());
126  }
127
128  @Override
129  public boolean isKilledRS(ServerName serverName) {
130    return killedRegionServers.contains(serverName);
131  }
132
133  @Override
134  public void stopRegionServer(ServerName serverName) throws IOException {
135    LOG.info("Stopping RS: {}", serverName.getServerName());
136    clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
137      serverName.getPort());
138  }
139
140  @Override
141  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
142    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
143  }
144
145  @Override
146  public void suspendRegionServer(ServerName serverName) throws IOException {
147    LOG.info("Suspend RS: {}", serverName.getServerName());
148    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
149      serverName.getPort());
150  }
151
152  @Override
153  public void resumeRegionServer(ServerName serverName) throws IOException {
154    LOG.info("Resume RS: {}", serverName.getServerName());
155    clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
156      serverName.getPort());
157  }
158
159  @Override
160  public void startZkNode(String hostname, int port) throws IOException {
161    LOG.info("Starting ZooKeeper node on: {}", hostname);
162    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
163  }
164
165  @Override
166  public void killZkNode(ServerName serverName) throws IOException {
167    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
168    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
169      serverName.getPort());
170  }
171
172  @Override
173  public void stopZkNode(ServerName serverName) throws IOException {
174    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
175    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
176      serverName.getPort());
177  }
178
179  @Override
180  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
181    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
182  }
183
184  @Override
185  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
186    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
187  }
188
189  @Override
190  public void startDataNode(ServerName serverName) throws IOException {
191    LOG.info("Starting data node on: {}", serverName.getServerName());
192    clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
193      serverName.getPort());
194  }
195
196  @Override
197  public void killDataNode(ServerName serverName) throws IOException {
198    LOG.info("Aborting data node on: {}", serverName.getServerName());
199    clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
200      serverName.getPort());
201  }
202
203  @Override
204  public void stopDataNode(ServerName serverName) throws IOException {
205    LOG.info("Stopping data node on: {}", serverName.getServerName());
206    clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
207      serverName.getPort());
208  }
209
210  @Override
211  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
212    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
213  }
214
215  @Override
216  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
217    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
218  }
219
220  @Override
221  public void startNameNode(ServerName serverName) throws IOException {
222    LOG.info("Starting name node on: {}", serverName.getServerName());
223    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
224      serverName.getPort());
225  }
226
227  @Override
228  public void killNameNode(ServerName serverName) throws IOException {
229    LOG.info("Aborting name node on: {}", serverName.getServerName());
230    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
231      serverName.getPort());
232  }
233
234  @Override
235  public void stopNameNode(ServerName serverName) throws IOException {
236    LOG.info("Stopping name node on: {}", serverName.getServerName());
237    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
238      serverName.getPort());
239  }
240
241  @Override
242  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
243    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
244  }
245
246  @Override
247  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
248    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
249  }
250
251  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
252    throws IOException {
253    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
254    long start = EnvironmentEdgeManager.currentTime();
255
256    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
257      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
258        return;
259      }
260      Threads.sleep(100);
261    }
262    throw new IOException("Timed-out waiting for service to stop: " + serverName);
263  }
264
265  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
266    throws IOException {
267    LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName());
268    long start = EnvironmentEdgeManager.currentTime();
269
270    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
271      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
272        return;
273      }
274      Threads.sleep(100);
275    }
276    throw new IOException("Timed-out waiting for service to start: " + serverName);
277  }
278
279  @Override
280  public MasterService.BlockingInterface getMasterAdminService() throws IOException {
281    return ((ClusterConnection) this.connection).getMaster();
282  }
283
284  @Override
285  public void startMaster(String hostname, int port) throws IOException {
286    LOG.info("Starting Master on: {}:{}", hostname, port);
287    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
288  }
289
290  @Override
291  public void killMaster(ServerName serverName) throws IOException {
292    LOG.info("Aborting Master: {}", serverName.getServerName());
293    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
294  }
295
296  @Override
297  public void stopMaster(ServerName serverName) throws IOException {
298    LOG.info("Stopping Master: {}", serverName.getServerName());
299    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
300  }
301
302  @Override
303  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
304    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
305  }
306
307  @Override
308  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
309    long start = EnvironmentEdgeManager.currentTime();
310    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
311      try {
312        getMasterAdminService();
313        return true;
314      } catch (MasterNotRunningException m) {
315        LOG.warn("Master not started yet " + m);
316      } catch (ZooKeeperConnectionException e) {
317        LOG.warn("Failed to connect to ZK " + e);
318      }
319      Threads.sleep(1000);
320    }
321    return false;
322  }
323
324  @Override
325  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
326    byte[] startKey = RegionInfo.getStartKey(regionName);
327    HRegionLocation regionLoc = null;
328    try (RegionLocator locator = connection.getRegionLocator(tn)) {
329      regionLoc = locator.getRegionLocation(startKey, true);
330    }
331    if (regionLoc == null) {
332      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
333      return null;
334    }
335    return regionLoc.getServerName();
336  }
337
338  @Override
339  public void waitUntilShutDown() {
340    // Simply wait for a few seconds for now (after issuing serverManager.kill
341    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
342  }
343
344  @Override
345  public void shutdown() throws IOException {
346    // not sure we want this
347    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
348  }
349
350  @Override
351  public boolean isDistributedCluster() {
352    return true;
353  }
354
355  @Override
356  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
357    ClusterMetrics current = getClusterMetrics();
358
359    LOG.info("Restoring cluster - started");
360
361    // do a best effort restore
362    boolean success = true;
363    success = restoreMasters(initial, current) && success;
364    success = restoreRegionServers(initial, current) && success;
365    success = restoreAdmin() && success;
366
367    LOG.info("Restoring cluster - done");
368    return success;
369  }
370
371  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
372    List<IOException> deferred = new ArrayList<>();
373    // check whether current master has changed
374    final ServerName initMaster = initial.getMasterName();
375    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
376      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
377        initMaster.getAddress(), current.getMasterName().getAddress());
378      // If initial master is stopped, start it, before restoring the state.
379      // It will come up as a backup master, if there is already an active master.
380      try {
381        if (
382          !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(),
383            initMaster.getPort())
384        ) {
385          LOG.info("Restoring cluster - starting initial active master at:{}",
386            initMaster.getAddress());
387          startMaster(initMaster.getHostname(), initMaster.getPort());
388        }
389
390        // master has changed, we would like to undo this.
391        // 1. Kill the current backups
392        // 2. Stop current master
393        // 3. Start backup masters
394        for (ServerName currentBackup : current.getBackupMasterNames()) {
395          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
396            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
397            stopMaster(currentBackup);
398          }
399        }
400        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
401        stopMaster(current.getMasterName());
402        waitForActiveAndReadyMaster(); // wait so that active master takes over
403      } catch (IOException ex) {
404        // if we fail to start the initial active master, we do not want to continue stopping
405        // backup masters. Just keep what we have now
406        deferred.add(ex);
407      }
408
409      // start backup masters
410      for (ServerName backup : initial.getBackupMasterNames()) {
411        try {
412          // these are not started in backup mode, but we should already have an active master
413          if (
414            !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(),
415              backup.getPort())
416          ) {
417            LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress());
418            startMaster(backup.getHostname(), backup.getPort());
419          }
420        } catch (IOException ex) {
421          deferred.add(ex);
422        }
423      }
424    } else {
425      // current master has not changed, match up backup masters
426      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
427      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
428      toStart.addAll(initial.getBackupMasterNames());
429      toKill.addAll(current.getBackupMasterNames());
430
431      for (ServerName server : current.getBackupMasterNames()) {
432        toStart.remove(server);
433      }
434      for (ServerName server : initial.getBackupMasterNames()) {
435        toKill.remove(server);
436      }
437
438      for (ServerName sn : toStart) {
439        try {
440          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
441            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
442            startMaster(sn.getHostname(), sn.getPort());
443          }
444        } catch (IOException ex) {
445          deferred.add(ex);
446        }
447      }
448
449      for (ServerName sn : toKill) {
450        try {
451          if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
452            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
453            stopMaster(sn);
454          }
455        } catch (IOException ex) {
456          deferred.add(ex);
457        }
458      }
459    }
460    if (!deferred.isEmpty()) {
461      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
462      for (int i = 0; i < deferred.size() && i < 3; i++) {
463        LOG.warn(Objects.toString(deferred.get(i)));
464      }
465    }
466
467    return deferred.isEmpty();
468  }
469
470  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
471    @Override
472    public int compare(ServerName o1, ServerName o2) {
473      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
474      if (compare != 0) return compare;
475      compare = o1.getPort() - o2.getPort();
476      if (compare != 0) return compare;
477      return 0;
478    }
479  }
480
481  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
482    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
483    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
484    toStart.addAll(initial.getLiveServerMetrics().keySet());
485    toKill.addAll(current.getLiveServerMetrics().keySet());
486
487    ServerName master = initial.getMasterName();
488
489    for (ServerName server : current.getLiveServerMetrics().keySet()) {
490      toStart.remove(server);
491    }
492    for (ServerName server : initial.getLiveServerMetrics().keySet()) {
493      toKill.remove(server);
494    }
495
496    List<IOException> deferred = new ArrayList<>();
497
498    for (ServerName sn : toStart) {
499      try {
500        if (
501          !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
502            && master.getPort() != sn.getPort()
503        ) {
504          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
505          startRegionServer(sn.getHostname(), sn.getPort());
506        }
507      } catch (IOException ex) {
508        deferred.add(ex);
509      }
510    }
511
512    for (ServerName sn : toKill) {
513      try {
514        if (
515          clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
516            && master.getPort() != sn.getPort()
517        ) {
518          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
519          stopRegionServer(sn);
520        }
521      } catch (IOException ex) {
522        deferred.add(ex);
523      }
524    }
525    if (!deferred.isEmpty()) {
526      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
527      for (int i = 0; i < deferred.size() && i < 3; i++) {
528        LOG.warn(Objects.toString(deferred.get(i)));
529      }
530    }
531
532    return deferred.isEmpty();
533  }
534
535  protected boolean restoreAdmin() throws IOException {
536    // While restoring above, if the HBase Master which was initially the Active one, was down
537    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
538    // to refresh its connections (otherwise it will return incorrect information) or we can
539    // point it to new instance.
540    try {
541      admin.close();
542    } catch (IOException ioe) {
543      LOG.warn("While closing the old connection", ioe);
544    }
545    this.admin = this.connection.getAdmin();
546    LOG.info("Added new HBaseAdmin");
547    return true;
548  }
549}