001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Objects;
026import java.util.Set;
027import java.util.TreeSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterManager.ServiceType;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ClusterConnection;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
043
044/**
045 * Manages the interactions with an already deployed distributed cluster (as opposed to
046 * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
047 */
048@InterfaceAudience.Private
049public class DistributedHBaseCluster extends HBaseCluster {
050  private Admin admin;
051  private final Connection connection;
052
053  private ClusterManager clusterManager;
054  /**
055   * List of RegionServers killed so far. ServerName also comprises startCode of a server,
056   * so any restarted instances of the same server will have different ServerName and will not
057   * coincide with past dead ones. So there's no need to cleanup this list.
058   */
059  private Set<ServerName> killedRegionServers = new HashSet<>();
060
061  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
062      throws IOException {
063    super(conf);
064    this.clusterManager = clusterManager;
065    this.connection = ConnectionFactory.createConnection(conf);
066    this.admin = this.connection.getAdmin();
067    this.initialClusterStatus = getClusterMetrics();
068  }
069
070  public void setClusterManager(ClusterManager clusterManager) {
071    this.clusterManager = clusterManager;
072  }
073
074  public ClusterManager getClusterManager() {
075    return clusterManager;
076  }
077
078  /**
079   * Returns a ClusterStatus for this HBase cluster
080   * @throws IOException
081   */
082  @Override
083  public ClusterMetrics getClusterMetrics() throws IOException {
084    return admin.getClusterMetrics();
085  }
086
087  @Override
088  public ClusterMetrics getInitialClusterMetrics() throws IOException {
089    return initialClusterStatus;
090  }
091
092  @Override
093  public void close() throws IOException {
094    if (this.admin != null) {
095      admin.close();
096    }
097    if (this.connection != null && !this.connection.isClosed()) {
098      this.connection.close();
099    }
100  }
101
102  @Override
103  public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
104  throws IOException {
105    return ((ClusterConnection)this.connection).getAdmin(serverName);
106  }
107
108  @Override
109  public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
110  throws IOException {
111    return ((ClusterConnection)this.connection).getClient(serverName);
112  }
113
114  @Override
115  public void startRegionServer(String hostname, int port) throws IOException {
116    LOG.info("Starting RS on: " + hostname);
117    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
118  }
119
120  @Override
121  public void killRegionServer(ServerName serverName) throws IOException {
122    LOG.info("Aborting RS: " + serverName.getServerName());
123    killedRegionServers.add(serverName);
124    clusterManager.kill(ServiceType.HBASE_REGIONSERVER,
125      serverName.getHostname(), serverName.getPort());
126  }
127
128  @Override
129  public boolean isKilledRS(ServerName serverName) {
130    return killedRegionServers.contains(serverName);
131  }
132
133  @Override
134  public void stopRegionServer(ServerName serverName) throws IOException {
135    LOG.info("Stopping RS: " + serverName.getServerName());
136    clusterManager.stop(ServiceType.HBASE_REGIONSERVER,
137      serverName.getHostname(), serverName.getPort());
138  }
139
140  @Override
141  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
142    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
143  }
144
145  @Override
146  public void startZkNode(String hostname, int port) throws IOException {
147    LOG.info("Starting ZooKeeper node on: " + hostname);
148    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
149  }
150
151  @Override
152  public void killZkNode(ServerName serverName) throws IOException {
153    LOG.info("Aborting ZooKeeper node on: " + serverName.getServerName());
154    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER,
155      serverName.getHostname(), serverName.getPort());
156  }
157
158  @Override
159  public void stopZkNode(ServerName serverName) throws IOException {
160    LOG.info("Stopping ZooKeeper node: " + serverName.getServerName());
161    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER,
162      serverName.getHostname(), serverName.getPort());
163  }
164
165  @Override
166  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
167    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
168  }
169
170  @Override
171  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
172    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
173  }
174
175  @Override
176  public void startDataNode(ServerName serverName) throws IOException {
177    LOG.info("Starting data node on: " + serverName.getServerName());
178    clusterManager.start(ServiceType.HADOOP_DATANODE,
179      serverName.getHostname(), serverName.getPort());
180  }
181
182  @Override
183  public void killDataNode(ServerName serverName) throws IOException {
184    LOG.info("Aborting data node on: " + serverName.getServerName());
185    clusterManager.kill(ServiceType.HADOOP_DATANODE,
186      serverName.getHostname(), serverName.getPort());
187  }
188
189  @Override
190  public void stopDataNode(ServerName serverName) throws IOException {
191    LOG.info("Stopping data node on: " + serverName.getServerName());
192    clusterManager.stop(ServiceType.HADOOP_DATANODE,
193      serverName.getHostname(), serverName.getPort());
194  }
195
196  @Override
197  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
198    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
199  }
200
201  @Override
202  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
203    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
204  }
205
206  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
207    throws IOException {
208    LOG.info("Waiting for service: " + service + " to stop: " + serverName.getServerName());
209    long start = System.currentTimeMillis();
210
211    while ((System.currentTimeMillis() - start) < timeout) {
212      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
213        return;
214      }
215      Threads.sleep(100);
216    }
217    throw new IOException("did timeout waiting for service to stop:" + serverName);
218  }
219
220  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
221    throws IOException {
222    LOG.info("Waiting for service: " + service + " to start: " + serverName.getServerName());
223    long start = System.currentTimeMillis();
224
225    while ((System.currentTimeMillis() - start) < timeout) {
226      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
227        return;
228      }
229      Threads.sleep(100);
230    }
231    throw new IOException("did timeout waiting for service to start:" + serverName);
232  }
233
234
235  @Override
236  public MasterService.BlockingInterface getMasterAdminService()
237  throws IOException {
238    return ((ClusterConnection)this.connection).getMaster();
239  }
240
241  @Override
242  public void startMaster(String hostname, int port) throws IOException {
243    LOG.info("Starting Master on: " + hostname + ":" + port);
244    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
245  }
246
247  @Override
248  public void killMaster(ServerName serverName) throws IOException {
249    LOG.info("Aborting Master: " + serverName.getServerName());
250    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
251  }
252
253  @Override
254  public void stopMaster(ServerName serverName) throws IOException {
255    LOG.info("Stopping Master: " + serverName.getServerName());
256    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
257  }
258
259  @Override
260  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
261    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
262  }
263
264  @Override
265  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
266    long start = System.currentTimeMillis();
267    while (System.currentTimeMillis() - start < timeout) {
268      try {
269        getMasterAdminService();
270        return true;
271      } catch (MasterNotRunningException m) {
272        LOG.warn("Master not started yet " + m);
273      } catch (ZooKeeperConnectionException e) {
274        LOG.warn("Failed to connect to ZK " + e);
275      }
276      Threads.sleep(1000);
277    }
278    return false;
279  }
280
281  @Override
282  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
283    byte[] startKey = RegionInfo.getStartKey(regionName);
284    HRegionLocation regionLoc = null;
285    try (RegionLocator locator = connection.getRegionLocator(tn)) {
286      regionLoc = locator.getRegionLocation(startKey, true);
287    }
288    if (regionLoc == null) {
289      LOG.warn("Cannot find region server holding region " + Bytes.toStringBinary(regionName));
290      return null;
291    }
292    return regionLoc.getServerName();
293  }
294
295  @Override
296  public void waitUntilShutDown() {
297    // Simply wait for a few seconds for now (after issuing serverManager.kill
298    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
299  }
300
301  @Override
302  public void shutdown() throws IOException {
303    // not sure we want this
304    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
305  }
306
307  @Override
308  public boolean isDistributedCluster() {
309    return true;
310  }
311
312  @Override
313  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
314    ClusterMetrics current = getClusterMetrics();
315
316    LOG.info("Restoring cluster - started");
317
318    // do a best effort restore
319    boolean success = true;
320    success = restoreMasters(initial, current) & success;
321    success = restoreRegionServers(initial, current) & success;
322    success = restoreAdmin() & success;
323
324    LOG.info("Restoring cluster - done");
325    return success;
326  }
327
328  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
329    List<IOException> deferred = new ArrayList<>();
330    //check whether current master has changed
331    final ServerName initMaster = initial.getMasterName();
332    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
333      LOG.info("Restoring cluster - Initial active master : " + initMaster.getAddress() +
334        " has changed to : " + current.getMasterName().getAddress());
335      // If initial master is stopped, start it, before restoring the state.
336      // It will come up as a backup master, if there is already an active master.
337      try {
338        if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
339                initMaster.getHostname(), initMaster.getPort())) {
340          LOG.info("Restoring cluster - starting initial active master at:"
341                  + initMaster.getAddress());
342          startMaster(initMaster.getHostname(), initMaster.getPort());
343        }
344
345        // master has changed, we would like to undo this.
346        // 1. Kill the current backups
347        // 2. Stop current master
348        // 3. Start backup masters
349        for (ServerName currentBackup : current.getBackupMasterNames()) {
350          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
351            LOG.info("Restoring cluster - stopping backup master: " + currentBackup);
352            stopMaster(currentBackup);
353          }
354        }
355        LOG.info("Restoring cluster - stopping active master: " + current.getMasterName());
356        stopMaster(current.getMasterName());
357        waitForActiveAndReadyMaster(); // wait so that active master takes over
358      } catch (IOException ex) {
359        // if we fail to start the initial active master, we do not want to continue stopping
360        // backup masters. Just keep what we have now
361        deferred.add(ex);
362      }
363
364      //start backup masters
365      for (ServerName backup : initial.getBackupMasterNames()) {
366        try {
367          //these are not started in backup mode, but we should already have an active master
368          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER,
369                  backup.getHostname(),
370                  backup.getPort())) {
371            LOG.info("Restoring cluster - starting initial backup master: "
372                    + backup.getAddress());
373            startMaster(backup.getHostname(), backup.getPort());
374          }
375        } catch (IOException ex) {
376          deferred.add(ex);
377        }
378      }
379    } else {
380      //current master has not changed, match up backup masters
381      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
382      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
383      toStart.addAll(initial.getBackupMasterNames());
384      toKill.addAll(current.getBackupMasterNames());
385
386      for (ServerName server : current.getBackupMasterNames()) {
387        toStart.remove(server);
388      }
389      for (ServerName server: initial.getBackupMasterNames()) {
390        toKill.remove(server);
391      }
392
393      for (ServerName sn:toStart) {
394        try {
395          if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
396            LOG.info("Restoring cluster - starting initial backup master: " + sn.getAddress());
397            startMaster(sn.getHostname(), sn.getPort());
398          }
399        } catch (IOException ex) {
400          deferred.add(ex);
401        }
402      }
403
404      for (ServerName sn:toKill) {
405        try {
406          if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
407            LOG.info("Restoring cluster - stopping backup master: " + sn.getAddress());
408            stopMaster(sn);
409          }
410        } catch (IOException ex) {
411          deferred.add(ex);
412        }
413      }
414    }
415    if (!deferred.isEmpty()) {
416      LOG.warn("Restoring cluster - restoring region servers reported "
417              + deferred.size() + " errors:");
418      for (int i=0; i<deferred.size() && i < 3; i++) {
419        LOG.warn(Objects.toString(deferred.get(i)));
420      }
421    }
422
423    return deferred.isEmpty();
424  }
425
426
427  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
428    @Override
429    public int compare(ServerName o1, ServerName o2) {
430      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
431      if (compare != 0) return compare;
432      compare = o1.getPort() - o2.getPort();
433      if (compare != 0) return compare;
434      return 0;
435    }
436  }
437
438  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
439    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
440    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
441    toStart.addAll(initial.getLiveServerMetrics().keySet());
442    toKill.addAll(current.getLiveServerMetrics().keySet());
443
444    ServerName master = initial.getMasterName();
445
446    for (ServerName server : current.getLiveServerMetrics().keySet()) {
447      toStart.remove(server);
448    }
449    for (ServerName server: initial.getLiveServerMetrics().keySet()) {
450      toKill.remove(server);
451    }
452
453    List<IOException> deferred = new ArrayList<>();
454
455    for(ServerName sn:toStart) {
456      try {
457        if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(),
458          sn.getPort()) && master.getPort() != sn.getPort()) {
459          LOG.info("Restoring cluster - starting initial region server: " + sn.getAddress());
460          startRegionServer(sn.getHostname(), sn.getPort());
461        }
462      } catch (IOException ex) {
463        deferred.add(ex);
464      }
465    }
466
467    for(ServerName sn:toKill) {
468      try {
469        if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(),
470          sn.getPort()) && master.getPort() != sn.getPort()) {
471          LOG.info("Restoring cluster - stopping initial region server: " + sn.getAddress());
472          stopRegionServer(sn);
473        }
474      } catch (IOException ex) {
475        deferred.add(ex);
476      }
477    }
478    if (!deferred.isEmpty()) {
479      LOG.warn("Restoring cluster - restoring region servers reported "
480              + deferred.size() + " errors:");
481      for (int i=0; i<deferred.size() && i < 3; i++) {
482        LOG.warn(Objects.toString(deferred.get(i)));
483      }
484    }
485
486    return deferred.isEmpty();
487  }
488
489  protected boolean restoreAdmin() throws IOException {
490    // While restoring above, if the HBase Master which was initially the Active one, was down
491    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
492    // to refresh its connections (otherwise it will return incorrect information) or we can
493    // point it to new instance.
494    try {
495      admin.close();
496    } catch (IOException ioe) {
497      LOG.warn("While closing the old connection", ioe);
498    }
499    this.admin = this.connection.getAdmin();
500    LOG.info("Added new HBaseAdmin");
501    return true;
502  }
503}