001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Objects;
026import java.util.Set;
027import java.util.TreeSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterManager.ServiceType;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ClusterConnection;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
043
044/**
045 * Manages the interactions with an already deployed distributed cluster (as opposed to
046 * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
047 */
048@InterfaceAudience.Private
049public class DistributedHBaseCluster extends HBaseCluster {
050  private Admin admin;
051  private final Connection connection;
052
053  private ClusterManager clusterManager;
054  /**
055   * List of RegionServers killed so far. ServerName also comprises startCode of a server,
056   * so any restarted instances of the same server will have different ServerName and will not
057   * coincide with past dead ones. So there's no need to cleanup this list.
058   */
059  private Set<ServerName> killedRegionServers = new HashSet<>();
060
061  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
062      throws IOException {
063    super(conf);
064    this.clusterManager = clusterManager;
065    this.connection = ConnectionFactory.createConnection(conf);
066    this.admin = this.connection.getAdmin();
067    this.initialClusterStatus = getClusterMetrics();
068  }
069
070  public void setClusterManager(ClusterManager clusterManager) {
071    this.clusterManager = clusterManager;
072  }
073
074  public ClusterManager getClusterManager() {
075    return clusterManager;
076  }
077
078  /**
079   * Returns a ClusterStatus for this HBase cluster
080   * @throws IOException
081   */
082  @Override
083  public ClusterMetrics getClusterMetrics() throws IOException {
084    return admin.getClusterMetrics();
085  }
086
087  @Override
088  public ClusterMetrics getInitialClusterMetrics() throws IOException {
089    return initialClusterStatus;
090  }
091
092  @Override
093  public void close() throws IOException {
094    if (this.admin != null) {
095      admin.close();
096    }
097    if (this.connection != null && !this.connection.isClosed()) {
098      this.connection.close();
099    }
100  }
101
102  @Override
103  public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
104  throws IOException {
105    return ((ClusterConnection)this.connection).getAdmin(serverName);
106  }
107
108  @Override
109  public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
110  throws IOException {
111    return ((ClusterConnection)this.connection).getClient(serverName);
112  }
113
114  @Override
115  public void startRegionServer(String hostname, int port) throws IOException {
116    LOG.info("Starting RS on: {}", hostname);
117    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
118  }
119
120  @Override
121  public void killRegionServer(ServerName serverName) throws IOException {
122    LOG.info("Aborting RS: {}", serverName.getServerName());
123    killedRegionServers.add(serverName);
124    clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
125      serverName.getHostname(), serverName.getPort());
126  }
127
128  @Override
129  public boolean isKilledRS(ServerName serverName) {
130    return killedRegionServers.contains(serverName);
131  }
132
133  @Override
134  public void stopRegionServer(ServerName serverName) throws IOException {
135    LOG.info("Stopping RS: {}", serverName.getServerName());
136    clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
137      serverName.getHostname(), serverName.getPort());
138  }
139
140  @Override
141  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
142    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
143  }
144
145  @Override
146  public void suspendRegionServer(ServerName serverName) throws IOException {
147    LOG.info("Suspend RS: {}", serverName.getServerName());
148    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER,
149        serverName.getHostname(), serverName.getPort());
150  }
151
152  @Override
153  public void resumeRegionServer(ServerName serverName) throws IOException {
154    LOG.info("Resume RS: {}", serverName.getServerName());
155    clusterManager.resume(ServiceType.HBASE_REGIONSERVER,
156        serverName.getHostname(), serverName.getPort());
157  }
158
159  @Override
160  public void startZkNode(String hostname, int port) throws IOException {
161    LOG.info("Starting ZooKeeper node on: {}", hostname);
162    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
163  }
164
165  @Override
166  public void killZkNode(ServerName serverName) throws IOException {
167    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
168    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER,
169      serverName.getHostname(), serverName.getPort());
170  }
171
172  @Override
173  public void stopZkNode(ServerName serverName) throws IOException {
174    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
175    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER,
176      serverName.getHostname(), serverName.getPort());
177  }
178
179  @Override
180  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
181    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
182  }
183
184  @Override
185  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
186    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
187  }
188
189  @Override
190  public void startDataNode(ServerName serverName) throws IOException {
191    LOG.info("Starting data node on: {}", serverName.getServerName());
192    clusterManager.start(ServiceType.HADOOP_DATANODE,
193      serverName.getHostname(), serverName.getPort());
194  }
195
196  @Override
197  public void killDataNode(ServerName serverName) throws IOException {
198    LOG.info("Aborting data node on: {}", serverName.getServerName());
199    clusterManager.kill(ServiceType.HADOOP_DATANODE,
200      serverName.getHostname(), serverName.getPort());
201  }
202
203  @Override
204  public void stopDataNode(ServerName serverName) throws IOException {
205    LOG.info("Stopping data node on: {}", serverName.getServerName());
206    clusterManager.stop(ServiceType.HADOOP_DATANODE,
207      serverName.getHostname(), serverName.getPort());
208  }
209
210  @Override
211  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
212    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
213  }
214
215  @Override
216  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
217    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
218  }
219
220  @Override
221  public void startNameNode(ServerName serverName) throws IOException {
222    LOG.info("Starting name node on: {}", serverName.getServerName());
223    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
224      serverName.getPort());
225  }
226
227  @Override
228  public void killNameNode(ServerName serverName) throws IOException {
229    LOG.info("Aborting name node on: {}", serverName.getServerName());
230    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
231      serverName.getPort());
232  }
233
234  @Override
235  public void stopNameNode(ServerName serverName) throws IOException {
236    LOG.info("Stopping name node on: {}", serverName.getServerName());
237    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
238      serverName.getPort());
239  }
240
241  @Override
242  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
243    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
244  }
245
246  @Override
247  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
248    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
249  }
250
251  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
252    throws IOException {
253    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
254    long start = System.currentTimeMillis();
255
256    while ((System.currentTimeMillis() - start) < timeout) {
257      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
258        return;
259      }
260      Threads.sleep(100);
261    }
262    throw new IOException("did timeout waiting for service to stop:" + serverName);
263  }
264
265  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
266    throws IOException {
267    LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName());
268    long start = System.currentTimeMillis();
269
270    while ((System.currentTimeMillis() - start) < timeout) {
271      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
272        return;
273      }
274      Threads.sleep(100);
275    }
276    throw new IOException("did timeout waiting for service to start:" + serverName);
277  }
278
279
280  @Override
281  public MasterService.BlockingInterface getMasterAdminService()
282  throws IOException {
283    return ((ClusterConnection)this.connection).getMaster();
284  }
285
286  @Override
287  public void startMaster(String hostname, int port) throws IOException {
288    LOG.info("Starting Master on: {}:{}", hostname, port);
289    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
290  }
291
292  @Override
293  public void killMaster(ServerName serverName) throws IOException {
294    LOG.info("Aborting Master: {}", serverName.getServerName());
295    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
296  }
297
298  @Override
299  public void stopMaster(ServerName serverName) throws IOException {
300    LOG.info("Stopping Master: {}", serverName.getServerName());
301    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
302  }
303
304  @Override
305  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
306    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
307  }
308
309  @Override
310  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
311    long start = System.currentTimeMillis();
312    while (System.currentTimeMillis() - start < timeout) {
313      try {
314        getMasterAdminService();
315        return true;
316      } catch (MasterNotRunningException m) {
317        LOG.warn("Master not started yet " + m);
318      } catch (ZooKeeperConnectionException e) {
319        LOG.warn("Failed to connect to ZK " + e);
320      }
321      Threads.sleep(1000);
322    }
323    return false;
324  }
325
326  @Override
327  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
328    byte[] startKey = RegionInfo.getStartKey(regionName);
329    HRegionLocation regionLoc = null;
330    try (RegionLocator locator = connection.getRegionLocator(tn)) {
331      regionLoc = locator.getRegionLocation(startKey, true);
332    }
333    if (regionLoc == null) {
334      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
335      return null;
336    }
337    return regionLoc.getServerName();
338  }
339
340  @Override
341  public void waitUntilShutDown() {
342    // Simply wait for a few seconds for now (after issuing serverManager.kill
343    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
344  }
345
346  @Override
347  public void shutdown() throws IOException {
348    // not sure we want this
349    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
350  }
351
352  @Override
353  public boolean isDistributedCluster() {
354    return true;
355  }
356
357  @Override
358  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
359    ClusterMetrics current = getClusterMetrics();
360
361    LOG.info("Restoring cluster - started");
362
363    // do a best effort restore
364    boolean success = true;
365    success = restoreMasters(initial, current) & success;
366    success = restoreRegionServers(initial, current) & success;
367    success = restoreAdmin() & success;
368
369    LOG.info("Restoring cluster - done");
370    return success;
371  }
372
373  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
374    List<IOException> deferred = new ArrayList<>();
375    //check whether current master has changed
376    final ServerName initMaster = initial.getMasterName();
377    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
378      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
379          initMaster.getAddress(), current.getMasterName().getAddress());
380      // If initial master is stopped, start it, before restoring the state.
381      // It will come up as a backup master, if there is already an active master.
382      try {
383        if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
384                initMaster.getHostname(), initMaster.getPort())) {
385          LOG.info("Restoring cluster - starting initial active master at:{}",
386              initMaster.getAddress());
387          startMaster(initMaster.getHostname(), initMaster.getPort());
388        }
389
390        // master has changed, we would like to undo this.
391        // 1. Kill the current backups
392        // 2. Stop current master
393        // 3. Start backup masters
394        for (ServerName currentBackup : current.getBackupMasterNames()) {
395          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
396            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
397            stopMaster(currentBackup);
398          }
399        }
400        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
401        stopMaster(current.getMasterName());
402        waitForActiveAndReadyMaster(); // wait so that active master takes over
403      } catch (IOException ex) {
404        // if we fail to start the initial active master, we do not want to continue stopping
405        // backup masters. Just keep what we have now
406        deferred.add(ex);
407      }
408
409      //start backup masters
410      for (ServerName backup : initial.getBackupMasterNames()) {
411        try {
412          //these are not started in backup mode, but we should already have an active master
413          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
414                  backup.getHostname(),
415                  backup.getPort())) {
416            LOG.info("Restoring cluster - starting initial backup master: {}",
417                backup.getAddress());
418            startMaster(backup.getHostname(), backup.getPort());
419          }
420        } catch (IOException ex) {
421          deferred.add(ex);
422        }
423      }
424    } else {
425      //current master has not changed, match up backup masters
426      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
427      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
428      toStart.addAll(initial.getBackupMasterNames());
429      toKill.addAll(current.getBackupMasterNames());
430
431      for (ServerName server : current.getBackupMasterNames()) {
432        toStart.remove(server);
433      }
434      for (ServerName server: initial.getBackupMasterNames()) {
435        toKill.remove(server);
436      }
437
438      for (ServerName sn:toStart) {
439        try {
440          if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
441            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
442            startMaster(sn.getHostname(), sn.getPort());
443          }
444        } catch (IOException ex) {
445          deferred.add(ex);
446        }
447      }
448
449      for (ServerName sn:toKill) {
450        try {
451          if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
452            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
453            stopMaster(sn);
454          }
455        } catch (IOException ex) {
456          deferred.add(ex);
457        }
458      }
459    }
460    if (!deferred.isEmpty()) {
461      LOG.warn("Restoring cluster - restoring region servers reported {} errors:",
462          deferred.size());
463      for (int i=0; i<deferred.size() && i < 3; i++) {
464        LOG.warn(Objects.toString(deferred.get(i)));
465      }
466    }
467
468    return deferred.isEmpty();
469  }
470
471
472  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
473    @Override
474    public int compare(ServerName o1, ServerName o2) {
475      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
476      if (compare != 0) return compare;
477      compare = o1.getPort() - o2.getPort();
478      if (compare != 0) return compare;
479      return 0;
480    }
481  }
482
483  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
484    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
485    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
486    toStart.addAll(initial.getLiveServerMetrics().keySet());
487    toKill.addAll(current.getLiveServerMetrics().keySet());
488
489    ServerName master = initial.getMasterName();
490
491    for (ServerName server : current.getLiveServerMetrics().keySet()) {
492      toStart.remove(server);
493    }
494    for (ServerName server: initial.getLiveServerMetrics().keySet()) {
495      toKill.remove(server);
496    }
497
498    List<IOException> deferred = new ArrayList<>();
499
500    for(ServerName sn:toStart) {
501      try {
502        if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(),
503          sn.getPort()) && master.getPort() != sn.getPort()) {
504          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
505          startRegionServer(sn.getHostname(), sn.getPort());
506        }
507      } catch (IOException ex) {
508        deferred.add(ex);
509      }
510    }
511
512    for(ServerName sn:toKill) {
513      try {
514        if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(),
515          sn.getPort()) && master.getPort() != sn.getPort()) {
516          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
517          stopRegionServer(sn);
518        }
519      } catch (IOException ex) {
520        deferred.add(ex);
521      }
522    }
523    if (!deferred.isEmpty()) {
524      LOG.warn("Restoring cluster - restoring region servers reported {} errors:",
525          deferred.size());
526      for (int i=0; i<deferred.size() && i < 3; i++) {
527        LOG.warn(Objects.toString(deferred.get(i)));
528      }
529    }
530
531    return deferred.isEmpty();
532  }
533
534  protected boolean restoreAdmin() throws IOException {
535    // While restoring above, if the HBase Master which was initially the Active one, was down
536    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
537    // to refresh its connections (otherwise it will return incorrect information) or we can
538    // point it to new instance.
539    try {
540      admin.close();
541    } catch (IOException ioe) {
542      LOG.warn("While closing the old connection", ioe);
543    }
544    this.admin = this.connection.getAdmin();
545    LOG.info("Added new HBaseAdmin");
546    return true;
547  }
548}