001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.testing; 019 020import java.util.List; 021import java.util.Optional; 022import java.util.UUID; 023import java.util.concurrent.CompletableFuture; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.StartTestingClusterOption; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.master.HMaster; 035import org.apache.hadoop.hbase.regionserver.HRegion; 036import org.apache.hadoop.hbase.regionserver.OnlineRegions; 037import org.apache.hadoop.hbase.regionserver.Region; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.yetus.audience.InterfaceAudience; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045@InterfaceAudience.Private 046class TestingHBaseClusterImpl implements TestingHBaseCluster { 047 048 private final HBaseTestingUtil util; 049 050 private final StartTestingClusterOption option; 051 052 private final String externalDfsUri; 053 054 private final String externalZkConnectString; 055 056 private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() 057 .setNameFormat(getClass().getSuperclass() + "-%d").setDaemon(true).build()); 058 059 private boolean miniClusterRunning = false; 060 061 private boolean miniHBaseClusterRunning = false; 062 063 TestingHBaseClusterImpl(TestingHBaseClusterOption option) { 064 this.util = new HBaseTestingUtil(option.conf()); 065 this.option = option.convert(); 066 this.externalDfsUri = option.getExternalDfsUri(); 067 this.externalZkConnectString = option.getExternalZkConnectString(); 068 } 069 070 @Override 071 public Configuration getConf() { 072 return util.getConfiguration(); 073 } 074 075 private int getRegionServerIndex(ServerName serverName) { 076 // we have a small number of region servers, this should be fine for now. 077 List<RegionServerThread> servers = util.getMiniHBaseCluster().getRegionServerThreads(); 078 for (int i = 0; i < servers.size(); i++) { 079 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 080 return i; 081 } 082 } 083 return -1; 084 } 085 086 private int getMasterIndex(ServerName serverName) { 087 List<MasterThread> masters = util.getMiniHBaseCluster().getMasterThreads(); 088 for (int i = 0; i < masters.size(); i++) { 089 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 090 return i; 091 } 092 } 093 return -1; 094 } 095 096 private void join(Thread thread, CompletableFuture<?> future) { 097 executor.execute(() -> { 098 try { 099 thread.join(); 100 future.complete(null); 101 } catch (InterruptedException e) { 102 future.completeExceptionally(e); 103 } 104 }); 105 } 106 107 @Override 108 public CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception { 109 CompletableFuture<Void> future = new CompletableFuture<>(); 110 int index = getMasterIndex(serverName); 111 if (index == -1) { 112 future.completeExceptionally(new IllegalArgumentException("Unknown master " + serverName)); 113 } 114 join(util.getMiniHBaseCluster().stopMaster(index), future); 115 return future; 116 } 117 118 @Override 119 public CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception { 120 CompletableFuture<Void> future = new CompletableFuture<>(); 121 int index = getRegionServerIndex(serverName); 122 if (index == -1) { 123 future 124 .completeExceptionally(new IllegalArgumentException("Unknown region server " + serverName)); 125 } 126 join(util.getMiniHBaseCluster().stopRegionServer(index), future); 127 return future; 128 } 129 130 @Override 131 public void stopHBaseCluster() throws Exception { 132 Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped"); 133 Preconditions.checkState(miniHBaseClusterRunning, "HBase cluster has already been started"); 134 util.shutdownMiniHBaseCluster(); 135 miniHBaseClusterRunning = false; 136 } 137 138 @Override 139 public void startHBaseCluster() throws Exception { 140 Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped"); 141 Preconditions.checkState(!miniHBaseClusterRunning, "HBase cluster has already been started"); 142 util.startMiniHBaseCluster(option); 143 miniHBaseClusterRunning = true; 144 } 145 146 @Override 147 public void start() throws Exception { 148 Preconditions.checkState(!miniClusterRunning, "Cluster has already been started"); 149 if (externalZkConnectString == null) { 150 util.startMiniZKCluster(); 151 } else { 152 Configuration conf = util.getConfiguration(); 153 conf.set(HConstants.ZOOKEEPER_QUORUM, externalZkConnectString); 154 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + UUID.randomUUID().toString()); 155 } 156 if (externalDfsUri == null) { 157 util.startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 158 } else { 159 Configuration conf = util.getConfiguration(); 160 conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, externalDfsUri); 161 } 162 util.startMiniHBaseCluster(option); 163 miniClusterRunning = true; 164 miniHBaseClusterRunning = true; 165 } 166 167 @Override 168 public void stop() throws Exception { 169 Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped"); 170 util.shutdownMiniCluster(); 171 miniClusterRunning = false; 172 miniHBaseClusterRunning = false; 173 } 174 175 @Override 176 public boolean isHBaseClusterRunning() { 177 return miniHBaseClusterRunning; 178 } 179 180 @Override 181 public boolean isClusterRunning() { 182 return miniClusterRunning; 183 } 184 185 @Override 186 public void startMaster() throws Exception { 187 util.getMiniHBaseCluster().startMaster(); 188 } 189 190 @Override 191 public void startMaster(String hostname, int port) throws Exception { 192 util.getMiniHBaseCluster().startMaster(hostname, port); 193 } 194 195 @Override 196 public void startRegionServer() throws Exception { 197 util.getMiniHBaseCluster().startRegionServer(); 198 } 199 200 @Override 201 public void startRegionServer(String hostname, int port) throws Exception { 202 util.getMiniHBaseCluster().startRegionServer(hostname, port); 203 } 204 205 @Override 206 public Optional<ServerName> getActiveMasterAddress() { 207 return Optional.ofNullable(util.getMiniHBaseCluster().getMaster()).map(HMaster::getServerName); 208 } 209 210 @Override 211 public List<ServerName> getBackupMasterAddresses() { 212 return util.getMiniHBaseCluster().getMasterThreads().stream().map(MasterThread::getMaster) 213 .filter(m -> !m.isActiveMaster()).map(HMaster::getServerName).collect(Collectors.toList()); 214 } 215 216 @Override 217 public List<ServerName> getRegionServerAddresses() { 218 return util.getMiniHBaseCluster().getRegionServerThreads().stream() 219 .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList()); 220 } 221 222 @Override 223 public Optional<Region> getRegion(RegionInfo regionInfo) { 224 for (RegionServerThread t : util.getMiniHBaseCluster().getRegionServerThreads()) { 225 for (HRegion region : t.getRegionServer().getRegions()) { 226 if (region.getRegionInfo().equals(regionInfo)) { 227 return Optional.of(region); 228 } 229 } 230 } 231 return Optional.empty(); 232 } 233 234 @Override 235 public Optional<OnlineRegions> getOnlineRegionsInterface(ServerName serverName) { 236 return Optional.ofNullable(util.getMiniHBaseCluster().getRegionServer(serverName)); 237 } 238}