001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.testing;
019
020import java.util.List;
021import java.util.Optional;
022import java.util.UUID;
023import java.util.concurrent.CompletableFuture;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.stream.Collectors;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.regionserver.HRegion;
036import org.apache.hadoop.hbase.regionserver.OnlineRegions;
037import org.apache.hadoop.hbase.regionserver.Region;
038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.yetus.audience.InterfaceAudience;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045@InterfaceAudience.Private
046class TestingHBaseClusterImpl implements TestingHBaseCluster {
047
048  private final HBaseTestingUtil util;
049
050  private final StartTestingClusterOption option;
051
052  private final String externalDfsUri;
053
054  private final String externalZkConnectString;
055
056  private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
057    .setNameFormat(getClass().getSuperclass() + "-%d").setDaemon(true).build());
058
059  private boolean miniClusterRunning = false;
060
061  private boolean miniHBaseClusterRunning = false;
062
063  TestingHBaseClusterImpl(TestingHBaseClusterOption option) {
064    this.util = new HBaseTestingUtil(option.conf());
065    this.option = option.convert();
066    this.externalDfsUri = option.getExternalDfsUri();
067    this.externalZkConnectString = option.getExternalZkConnectString();
068  }
069
070  @Override
071  public Configuration getConf() {
072    return util.getConfiguration();
073  }
074
075  private int getRegionServerIndex(ServerName serverName) {
076    // we have a small number of region servers, this should be fine for now.
077    List<RegionServerThread> servers = util.getMiniHBaseCluster().getRegionServerThreads();
078    for (int i = 0; i < servers.size(); i++) {
079      if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
080        return i;
081      }
082    }
083    return -1;
084  }
085
086  private int getMasterIndex(ServerName serverName) {
087    List<MasterThread> masters = util.getMiniHBaseCluster().getMasterThreads();
088    for (int i = 0; i < masters.size(); i++) {
089      if (masters.get(i).getMaster().getServerName().equals(serverName)) {
090        return i;
091      }
092    }
093    return -1;
094  }
095
096  private void join(Thread thread, CompletableFuture<?> future) {
097    executor.execute(() -> {
098      try {
099        thread.join();
100        future.complete(null);
101      } catch (InterruptedException e) {
102        future.completeExceptionally(e);
103      }
104    });
105  }
106
107  @Override
108  public CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception {
109    CompletableFuture<Void> future = new CompletableFuture<>();
110    int index = getMasterIndex(serverName);
111    if (index == -1) {
112      future.completeExceptionally(new IllegalArgumentException("Unknown master " + serverName));
113    }
114    join(util.getMiniHBaseCluster().stopMaster(index), future);
115    return future;
116  }
117
118  @Override
119  public CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception {
120    CompletableFuture<Void> future = new CompletableFuture<>();
121    int index = getRegionServerIndex(serverName);
122    if (index == -1) {
123      future
124        .completeExceptionally(new IllegalArgumentException("Unknown region server " + serverName));
125    }
126    join(util.getMiniHBaseCluster().stopRegionServer(index), future);
127    return future;
128  }
129
130  @Override
131  public void stopHBaseCluster() throws Exception {
132    Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
133    Preconditions.checkState(miniHBaseClusterRunning, "HBase cluster has already been started");
134    util.shutdownMiniHBaseCluster();
135    miniHBaseClusterRunning = false;
136  }
137
138  @Override
139  public void startHBaseCluster() throws Exception {
140    Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
141    Preconditions.checkState(!miniHBaseClusterRunning, "HBase cluster has already been started");
142    util.startMiniHBaseCluster(option);
143    miniHBaseClusterRunning = true;
144  }
145
146  @Override
147  public void start() throws Exception {
148    Preconditions.checkState(!miniClusterRunning, "Cluster has already been started");
149    if (externalZkConnectString == null) {
150      util.startMiniZKCluster();
151    } else {
152      Configuration conf = util.getConfiguration();
153      conf.set(HConstants.ZOOKEEPER_QUORUM, externalZkConnectString);
154      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + UUID.randomUUID().toString());
155    }
156    if (externalDfsUri == null) {
157      util.startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
158    } else {
159      Configuration conf = util.getConfiguration();
160      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, externalDfsUri);
161    }
162    util.startMiniHBaseCluster(option);
163    miniClusterRunning = true;
164    miniHBaseClusterRunning = true;
165  }
166
167  @Override
168  public void stop() throws Exception {
169    Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
170    util.shutdownMiniCluster();
171    miniClusterRunning = false;
172    miniHBaseClusterRunning = false;
173  }
174
175  @Override
176  public boolean isHBaseClusterRunning() {
177    return miniHBaseClusterRunning;
178  }
179
180  @Override
181  public boolean isClusterRunning() {
182    return miniClusterRunning;
183  }
184
185  @Override
186  public void startMaster() throws Exception {
187    util.getMiniHBaseCluster().startMaster();
188  }
189
190  @Override
191  public void startMaster(String hostname, int port) throws Exception {
192    util.getMiniHBaseCluster().startMaster(hostname, port);
193  }
194
195  @Override
196  public void startRegionServer() throws Exception {
197    util.getMiniHBaseCluster().startRegionServer();
198  }
199
200  @Override
201  public void startRegionServer(String hostname, int port) throws Exception {
202    util.getMiniHBaseCluster().startRegionServer(hostname, port);
203  }
204
205  @Override
206  public Optional<ServerName> getActiveMasterAddress() {
207    return Optional.ofNullable(util.getMiniHBaseCluster().getMaster()).map(HMaster::getServerName);
208  }
209
210  @Override
211  public List<ServerName> getBackupMasterAddresses() {
212    return util.getMiniHBaseCluster().getMasterThreads().stream().map(MasterThread::getMaster)
213      .filter(m -> !m.isActiveMaster()).map(HMaster::getServerName).collect(Collectors.toList());
214  }
215
216  @Override
217  public List<ServerName> getRegionServerAddresses() {
218    return util.getMiniHBaseCluster().getRegionServerThreads().stream()
219      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
220  }
221
222  @Override
223  public Optional<Region> getRegion(RegionInfo regionInfo) {
224    for (RegionServerThread t : util.getMiniHBaseCluster().getRegionServerThreads()) {
225      for (HRegion region : t.getRegionServer().getRegions()) {
226        if (region.getRegionInfo().equals(regionInfo)) {
227          return Optional.of(region);
228        }
229      }
230    }
231    return Optional.empty();
232  }
233
234  @Override
235  public Optional<OnlineRegions> getOnlineRegionsInterface(ServerName serverName) {
236    return Optional.ofNullable(util.getMiniHBaseCluster().getRegionServer(serverName));
237  }
238}