001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 021import static org.mockito.ArgumentMatchers.any; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableMap; 027import java.util.SortedSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CoordinatedStateManager; 031import org.apache.hadoop.hbase.ServerMetricsBuilder; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.TableDescriptors; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.ClusterConnection; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.client.TableState; 042import org.apache.hadoop.hbase.master.LoadBalancer; 043import org.apache.hadoop.hbase.master.MasterFileSystem; 044import org.apache.hadoop.hbase.master.MasterServices; 045import org.apache.hadoop.hbase.master.MasterWalManager; 046import org.apache.hadoop.hbase.master.MockNoopMasterServices; 047import org.apache.hadoop.hbase.master.ServerManager; 048import org.apache.hadoop.hbase.master.SplitWALManager; 049import org.apache.hadoop.hbase.master.TableStateManager; 050import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 051import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 052import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 053import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 054import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 055import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 056import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 057import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 058import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 059import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 060import org.apache.hadoop.hbase.security.Superusers; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.zookeeper.KeeperException; 063import org.mockito.Mockito; 064import org.mockito.invocation.InvocationOnMock; 065import org.mockito.stubbing.Answer; 066 067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 078 079/** 080 * A mocked master services. 081 * Tries to fake it. May not always work. 082 */ 083public class MockMasterServices extends MockNoopMasterServices { 084 private final MasterFileSystem fileSystemManager; 085 private final MasterWalManager walManager; 086 private final SplitWALManager splitWALManager; 087 private final AssignmentManager assignmentManager; 088 private final TableStateManager tableStateManager; 089 090 private MasterProcedureEnv procedureEnv; 091 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 092 private ProcedureStore procedureStore; 093 private final ClusterConnection connection; 094 private final LoadBalancer balancer; 095 private final ServerManager serverManager; 096 097 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 098 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 099 public static final ServerName MOCK_MASTER_SERVERNAME = 100 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 101 102 public MockMasterServices(Configuration conf, 103 NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException { 104 super(conf); 105 Superusers.initialize(conf); 106 this.fileSystemManager = new MasterFileSystem(conf); 107 this.walManager = new MasterWalManager(this); 108 this.splitWALManager = 109 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)? 110 null: new SplitWALManager(this); 111 112 // Mock an AM. 113 this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)); 114 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 115 this.serverManager = new ServerManager(this); 116 this.tableStateManager = Mockito.mock(TableStateManager.class); 117 Mockito.when(this.tableStateManager.getTableState(Mockito.any())). 118 thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"), 119 TableState.State.ENABLED)); 120 121 // Mock up a Client Interface 122 ClientProtos.ClientService.BlockingInterface ri = 123 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 124 MutateResponse.Builder builder = MutateResponse.newBuilder(); 125 builder.setProcessed(true); 126 try { 127 Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build()); 128 } catch (ServiceException se) { 129 throw ProtobufUtil.handleRemoteException(se); 130 } 131 try { 132 Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 133 @Override 134 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 135 return buildMultiResponse(invocation.getArgument(1)); 136 } 137 }); 138 } catch (ServiceException se) { 139 throw ProtobufUtil.getRemoteException(se); 140 } 141 // Mock n ClusterConnection and an AdminProtocol implementation. Have the 142 // ClusterConnection return the HRI. Have the HRI return a few mocked up responses 143 // to make our test work. 144 this.connection = 145 HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), 146 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, 147 MOCK_MASTER_SERVERNAME, RegionInfoBuilder.FIRST_META_REGIONINFO); 148 // Set hbase.rootdir into test dir. 149 Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); 150 CommonFSUtils.setRootDir(getConfiguration(), rootdir); 151 } 152 153 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 154 throws IOException, KeeperException { 155 startProcedureExecutor(remoteDispatcher); 156 this.assignmentManager.start(); 157 for (int i = 0; i < numServes; ++i) { 158 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 159 serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); 160 } 161 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 162 } 163 164 /** 165 * Call this restart method only after running MockMasterServices#start() 166 * The RSs can be differentiated by the port number, see 167 * ServerName in MockMasterServices#start() method above. 168 * Restart of region server will have new startcode in server name 169 * 170 * @param serverName Server name to be restarted 171 */ 172 public void restartRegionServer(ServerName serverName) throws IOException { 173 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 174 long startCode = -1; 175 for (ServerName s : onlineServers) { 176 if (s.getAddress().equals(serverName.getAddress())) { 177 startCode = s.getStartcode() + 1; 178 break; 179 } 180 } 181 if (startCode == -1) { 182 return; 183 } 184 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 185 serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); 186 } 187 188 @Override 189 public void stop(String why) { 190 stopProcedureExecutor(); 191 this.assignmentManager.stop(); 192 } 193 194 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 195 throws IOException { 196 final Configuration conf = getConfiguration(); 197 this.procedureStore = new NoopProcedureStore(); 198 this.procedureStore.registerListener(new ProcedureStoreListener() { 199 200 @Override 201 public void abortProcess() { 202 abort("The Procedure Store lost the lease", null); 203 } 204 }); 205 206 this.procedureEnv = new MasterProcedureEnv(this, 207 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 208 209 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 210 procedureEnv.getProcedureScheduler()); 211 212 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 213 Math.max(Runtime.getRuntime().availableProcessors(), 214 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 215 final boolean abortOnCorruption = conf.getBoolean( 216 MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 217 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 218 this.procedureStore.start(numThreads); 219 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 220 this.procedureEnv.getRemoteDispatcher().start(); 221 } 222 223 private void stopProcedureExecutor() { 224 if (this.procedureEnv != null) { 225 this.procedureEnv.getRemoteDispatcher().stop(); 226 } 227 228 if (this.procedureExecutor != null) { 229 this.procedureExecutor.stop(); 230 } 231 232 if (this.procedureStore != null) { 233 this.procedureStore.stop(isAborted()); 234 } 235 } 236 237 @Override 238 public boolean isInitialized() { 239 return true; 240 } 241 242 @Override 243 public ProcedureEvent<?> getInitializedEvent() { 244 return this.initialized; 245 } 246 247 @Override 248 public MasterFileSystem getMasterFileSystem() { 249 return fileSystemManager; 250 } 251 252 @Override 253 public MasterWalManager getMasterWalManager() { 254 return walManager; 255 } 256 257 @Override 258 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 259 return procedureExecutor; 260 } 261 262 @Override 263 public LoadBalancer getLoadBalancer() { 264 return balancer; 265 } 266 267 @Override 268 public ServerManager getServerManager() { 269 return serverManager; 270 } 271 272 @Override 273 public AssignmentManager getAssignmentManager() { 274 return assignmentManager; 275 } 276 277 @Override 278 public TableStateManager getTableStateManager() { 279 return tableStateManager; 280 } 281 282 @Override 283 public ClusterConnection getConnection() { 284 return this.connection; 285 } 286 287 @Override 288 public ServerName getServerName() { 289 return MOCK_MASTER_SERVERNAME; 290 } 291 292 @Override 293 public CoordinatedStateManager getCoordinatedStateManager() { 294 return super.getCoordinatedStateManager(); 295 } 296 297 private static class MockRegionStateStore extends RegionStateStore { 298 public MockRegionStateStore(final MasterServices master) { 299 super(master); 300 } 301 302 @Override 303 public void updateRegionLocation(RegionStateNode regionNode) throws IOException { 304 } 305 } 306 307 @Override 308 public TableDescriptors getTableDescriptors() { 309 return new TableDescriptors() { 310 @Override 311 public TableDescriptor remove(TableName tablename) throws IOException { 312 // noop 313 return null; 314 } 315 316 @Override 317 public Map<String, TableDescriptor> getAll() throws IOException { 318 // noop 319 return null; 320 } 321 322 @Override 323 public TableDescriptor get(TableName tablename) throws IOException { 324 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 325 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 326 return builder.build(); 327 } 328 329 @Override 330 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 331 return null; 332 } 333 334 @Override 335 public void update(TableDescriptor htd, boolean cacheOnly) throws IOException { 336 // noop 337 } 338 }; 339 } 340 341 private static MultiResponse buildMultiResponse(MultiRequest req) { 342 MultiResponse.Builder builder = MultiResponse.newBuilder(); 343 RegionActionResult.Builder regionActionResultBuilder = 344 RegionActionResult.newBuilder(); 345 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 346 for (RegionAction regionAction: req.getRegionActionList()) { 347 regionActionResultBuilder.clear(); 348 for (ClientProtos.Action action: regionAction.getActionList()) { 349 roeBuilder.clear(); 350 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 351 roeBuilder.setIndex(action.getIndex()); 352 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 353 } 354 builder.addRegionActionResult(regionActionResultBuilder.build()); 355 } 356 return builder.build(); 357 } 358 359 @Override public SplitWALManager getSplitWALManager() { 360 return splitWALManager; 361 } 362}