001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.mockito.ArgumentMatchers.any; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.SortedSet; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.CoordinatedStateManager; 030import org.apache.hadoop.hbase.ServerMetricsBuilder; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableDescriptors; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.ClusterConnection; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.client.TableState; 041import org.apache.hadoop.hbase.master.LoadBalancer; 042import org.apache.hadoop.hbase.master.MasterFileSystem; 043import org.apache.hadoop.hbase.master.MasterServices; 044import org.apache.hadoop.hbase.master.MasterWalManager; 045import org.apache.hadoop.hbase.master.MockNoopMasterServices; 046import org.apache.hadoop.hbase.master.ServerManager; 047import org.apache.hadoop.hbase.master.TableStateManager; 048import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 049import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 050import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 051import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 052import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 053import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 054import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 055import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 056import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 057import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 058import org.apache.hadoop.hbase.security.Superusers; 059import org.apache.hadoop.hbase.util.CommonFSUtils; 060import org.apache.zookeeper.KeeperException; 061import org.mockito.Mockito; 062import org.mockito.invocation.InvocationOnMock; 063import org.mockito.stubbing.Answer; 064 065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 076 077/** 078 * A mocked master services. 079 * Tries to fake it. May not always work. 080 */ 081public class MockMasterServices extends MockNoopMasterServices { 082 private final MasterFileSystem fileSystemManager; 083 private final MasterWalManager walManager; 084 private final AssignmentManager assignmentManager; 085 private final TableStateManager tableStateManager; 086 087 private MasterProcedureEnv procedureEnv; 088 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 089 private ProcedureStore procedureStore; 090 private final ClusterConnection connection; 091 private final LoadBalancer balancer; 092 private final ServerManager serverManager; 093 094 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 095 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 096 public static final ServerName MOCK_MASTER_SERVERNAME = 097 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 098 099 public MockMasterServices(Configuration conf, 100 NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException { 101 super(conf); 102 Superusers.initialize(conf); 103 this.fileSystemManager = new MasterFileSystem(conf); 104 this.walManager = new MasterWalManager(this); 105 // Mock an AM. 106 this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { 107 @Override 108 public boolean isTableEnabled(final TableName tableName) { 109 return true; 110 } 111 112 @Override 113 public boolean isTableDisabled(final TableName tableName) { 114 return false; 115 } 116 }; 117 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 118 this.serverManager = new ServerManager(this); 119 this.tableStateManager = Mockito.mock(TableStateManager.class); 120 Mockito.when(this.tableStateManager.getTableState(Mockito.any())). 121 thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"), 122 TableState.State.ENABLED)); 123 124 // Mock up a Client Interface 125 ClientProtos.ClientService.BlockingInterface ri = 126 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 127 MutateResponse.Builder builder = MutateResponse.newBuilder(); 128 builder.setProcessed(true); 129 try { 130 Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build()); 131 } catch (ServiceException se) { 132 throw ProtobufUtil.handleRemoteException(se); 133 } 134 try { 135 Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 136 @Override 137 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 138 return buildMultiResponse(invocation.getArgument(1)); 139 } 140 }); 141 } catch (ServiceException se) { 142 throw ProtobufUtil.getRemoteException(se); 143 } 144 // Mock n ClusterConnection and an AdminProtocol implementation. Have the 145 // ClusterConnection return the HRI. Have the HRI return a few mocked up responses 146 // to make our test work. 147 this.connection = 148 HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), 149 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME, 150 RegionInfoBuilder.FIRST_META_REGIONINFO); 151 // Set hbase.rootdir into test dir. 152 Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); 153 CommonFSUtils.setRootDir(getConfiguration(), rootdir); 154 } 155 156 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 157 throws IOException, KeeperException { 158 startProcedureExecutor(remoteDispatcher); 159 this.assignmentManager.start(); 160 for (int i = 0; i < numServes; ++i) { 161 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 162 serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); 163 } 164 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 165 } 166 167 /** 168 * Call this restart method only after running MockMasterServices#start() 169 * The RSs can be differentiated by the port number, see 170 * ServerName in MockMasterServices#start() method above. 171 * Restart of region server will have new startcode in server name 172 * 173 * @param serverName Server name to be restarted 174 */ 175 public void restartRegionServer(ServerName serverName) throws IOException { 176 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 177 long startCode = -1; 178 for (ServerName s : onlineServers) { 179 if (s.getAddress().equals(serverName.getAddress())) { 180 startCode = s.getStartcode() + 1; 181 break; 182 } 183 } 184 if (startCode == -1) { 185 return; 186 } 187 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 188 serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn)); 189 } 190 191 @Override 192 public void stop(String why) { 193 stopProcedureExecutor(); 194 this.assignmentManager.stop(); 195 } 196 197 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 198 throws IOException { 199 final Configuration conf = getConfiguration(); 200 this.procedureStore = new NoopProcedureStore(); 201 this.procedureStore.registerListener(new ProcedureStoreListener() { 202 203 @Override 204 public void abortProcess() { 205 abort("The Procedure Store lost the lease", null); 206 } 207 }); 208 209 this.procedureEnv = new MasterProcedureEnv(this, 210 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 211 212 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 213 procedureEnv.getProcedureScheduler()); 214 215 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 216 Math.max(Runtime.getRuntime().availableProcessors(), 217 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 218 final boolean abortOnCorruption = conf.getBoolean( 219 MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 220 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 221 this.procedureStore.start(numThreads); 222 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 223 this.procedureEnv.getRemoteDispatcher().start(); 224 } 225 226 private void stopProcedureExecutor() { 227 if (this.procedureEnv != null) { 228 this.procedureEnv.getRemoteDispatcher().stop(); 229 } 230 231 if (this.procedureExecutor != null) { 232 this.procedureExecutor.stop(); 233 } 234 235 if (this.procedureStore != null) { 236 this.procedureStore.stop(isAborted()); 237 } 238 } 239 240 @Override 241 public boolean isInitialized() { 242 return true; 243 } 244 245 @Override 246 public ProcedureEvent<?> getInitializedEvent() { 247 return this.initialized; 248 } 249 250 @Override 251 public MasterFileSystem getMasterFileSystem() { 252 return fileSystemManager; 253 } 254 255 @Override 256 public MasterWalManager getMasterWalManager() { 257 return walManager; 258 } 259 260 @Override 261 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 262 return procedureExecutor; 263 } 264 265 @Override 266 public LoadBalancer getLoadBalancer() { 267 return balancer; 268 } 269 270 @Override 271 public ServerManager getServerManager() { 272 return serverManager; 273 } 274 275 @Override 276 public AssignmentManager getAssignmentManager() { 277 return assignmentManager; 278 } 279 280 @Override 281 public TableStateManager getTableStateManager() { 282 return tableStateManager; 283 } 284 285 @Override 286 public ClusterConnection getConnection() { 287 return this.connection; 288 } 289 290 @Override 291 public ServerName getServerName() { 292 return MOCK_MASTER_SERVERNAME; 293 } 294 295 @Override 296 public CoordinatedStateManager getCoordinatedStateManager() { 297 return super.getCoordinatedStateManager(); 298 } 299 300 private static class MockRegionStateStore extends RegionStateStore { 301 public MockRegionStateStore(final MasterServices master) { 302 super(master); 303 } 304 305 @Override 306 public void updateRegionLocation(RegionStateNode regionNode) throws IOException { 307 } 308 } 309 310 @Override 311 public TableDescriptors getTableDescriptors() { 312 return new TableDescriptors() { 313 @Override 314 public TableDescriptor remove(TableName tablename) throws IOException { 315 // noop 316 return null; 317 } 318 319 @Override 320 public Map<String, TableDescriptor> getAll() throws IOException { 321 // noop 322 return null; 323 } 324 325 @Override 326 public TableDescriptor get(TableName tablename) throws IOException { 327 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 328 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 329 return builder.build(); 330 } 331 332 @Override 333 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 334 return null; 335 } 336 337 @Override 338 public void update(TableDescriptor htd) throws IOException { 339 // noop 340 } 341 342 @Override 343 public void setCacheOn() throws IOException { 344 } 345 346 @Override 347 public void setCacheOff() throws IOException { 348 } 349 }; 350 } 351 352 private static MultiResponse buildMultiResponse(MultiRequest req) { 353 MultiResponse.Builder builder = MultiResponse.newBuilder(); 354 RegionActionResult.Builder regionActionResultBuilder = 355 RegionActionResult.newBuilder(); 356 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 357 for (RegionAction regionAction: req.getRegionActionList()) { 358 regionActionResultBuilder.clear(); 359 for (ClientProtos.Action action: regionAction.getActionList()) { 360 roeBuilder.clear(); 361 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 362 roeBuilder.setIndex(action.getIndex()); 363 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 364 } 365 builder.addRegionActionResult(regionActionResultBuilder.build()); 366 } 367 return builder.build(); 368 } 369}