001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.mockito.ArgumentMatchers.any; 021 022import java.io.IOException; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableMap; 027import java.util.SortedSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CoordinatedStateManager; 031import org.apache.hadoop.hbase.ServerLoad; 032import org.apache.hadoop.hbase.ServerMetricsBuilder; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableDescriptors; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.YouAreDeadException; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableState; 044import org.apache.hadoop.hbase.master.LoadBalancer; 045import org.apache.hadoop.hbase.master.MasterFileSystem; 046import org.apache.hadoop.hbase.master.MasterServices; 047import org.apache.hadoop.hbase.master.MasterWalManager; 048import org.apache.hadoop.hbase.master.MockNoopMasterServices; 049import org.apache.hadoop.hbase.master.ServerManager; 050import org.apache.hadoop.hbase.master.TableStateManager; 051import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 052import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 054import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 055import org.apache.hadoop.hbase.procedure2.Procedure; 056import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 059import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 060import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 061import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 062import org.apache.hadoop.hbase.security.Superusers; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.zookeeper.KeeperException; 065import org.mockito.Mockito; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068 069import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 070 071import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 080 081 082/** 083 * A mocked master services. 084 * Tries to fake it. May not always work. 085 */ 086public class MockMasterServices extends MockNoopMasterServices { 087 private final MasterFileSystem fileSystemManager; 088 private final MasterWalManager walManager; 089 private final AssignmentManager assignmentManager; 090 private final TableStateManager tableStateManager; 091 092 private MasterProcedureEnv procedureEnv; 093 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 094 private ProcedureStore procedureStore; 095 private final ClusterConnection connection; 096 private final LoadBalancer balancer; 097 private final ServerManager serverManager; 098 // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'. 099 private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers; 100 101 private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); 102 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 103 public static final ServerName MOCK_MASTER_SERVERNAME = 104 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 105 106 public MockMasterServices(Configuration conf, 107 NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers) 108 throws IOException { 109 super(conf); 110 this.regionsToRegionServers = regionsToRegionServers; 111 Superusers.initialize(conf); 112 this.fileSystemManager = new MasterFileSystem(conf); 113 this.walManager = new MasterWalManager(this); 114 // Mock an AM. 115 this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { 116 @Override 117 public boolean isTableEnabled(final TableName tableName) { 118 return true; 119 } 120 121 @Override 122 public boolean isTableDisabled(final TableName tableName) { 123 return false; 124 } 125 126 @Override 127 protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { 128 // Make a report with current state of the server 'serverName' before we call wait.. 129 SortedSet<byte []> regions = regionsToRegionServers.get(serverName); 130 try { 131 getAssignmentManager().reportOnlineRegions(serverName, 132 regions == null? new HashSet<byte []>(): regions); 133 } catch (YouAreDeadException e) { 134 throw new RuntimeException(e); 135 } 136 return super.waitServerReportEvent(serverName, proc); 137 } 138 }; 139 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 140 this.serverManager = new ServerManager(this); 141 this.tableStateManager = Mockito.mock(TableStateManager.class); 142 Mockito.when(this.tableStateManager.getTableState(Mockito.any())). 143 thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"), 144 TableState.State.ENABLED)); 145 146 // Mock up a Client Interface 147 ClientProtos.ClientService.BlockingInterface ri = 148 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 149 MutateResponse.Builder builder = MutateResponse.newBuilder(); 150 builder.setProcessed(true); 151 try { 152 Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build()); 153 } catch (ServiceException se) { 154 throw ProtobufUtil.handleRemoteException(se); 155 } 156 try { 157 Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 158 @Override 159 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 160 return buildMultiResponse(invocation.getArgument(1)); 161 } 162 }); 163 } catch (ServiceException se) { 164 throw ProtobufUtil.getRemoteException(se); 165 } 166 // Mock n ClusterConnection and an AdminProtocol implementation. Have the 167 // ClusterConnection return the HRI. Have the HRI return a few mocked up responses 168 // to make our test work. 169 this.connection = 170 HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), 171 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME, 172 RegionInfoBuilder.FIRST_META_REGIONINFO); 173 // Set hbase.rootdir into test dir. 174 Path rootdir = FSUtils.getRootDir(getConfiguration()); 175 FSUtils.setRootDir(getConfiguration(), rootdir); 176 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class); 177 } 178 179 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 180 throws IOException, KeeperException { 181 startProcedureExecutor(remoteDispatcher); 182 this.assignmentManager.start(); 183 for (int i = 0; i < numServes; ++i) { 184 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 185 serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); 186 } 187 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 188 } 189 190 /** 191 * Call this restart method only after running MockMasterServices#start() 192 * The RSs can be differentiated by the port number, see 193 * ServerName in MockMasterServices#start() method above. 194 * Restart of region server will have new startcode in server name 195 * 196 * @param serverName Server name to be restarted 197 */ 198 public void restartRegionServer(ServerName serverName) throws IOException { 199 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 200 long startCode = -1; 201 for (ServerName s : onlineServers) { 202 if (s.getAddress().equals(serverName.getAddress())) { 203 startCode = s.getStartcode() + 1; 204 break; 205 } 206 } 207 if (startCode == -1) { 208 return; 209 } 210 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 211 serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); 212 } 213 214 @Override 215 public void stop(String why) { 216 stopProcedureExecutor(); 217 this.assignmentManager.stop(); 218 } 219 220 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 221 throws IOException { 222 final Configuration conf = getConfiguration(); 223 this.procedureStore = new NoopProcedureStore(); 224 this.procedureStore.registerListener(new ProcedureStoreListener() { 225 226 @Override 227 public void abortProcess() { 228 abort("The Procedure Store lost the lease", null); 229 } 230 }); 231 232 this.procedureEnv = new MasterProcedureEnv(this, 233 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 234 235 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 236 procedureEnv.getProcedureScheduler()); 237 238 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 239 Math.max(Runtime.getRuntime().availableProcessors(), 240 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 241 final boolean abortOnCorruption = conf.getBoolean( 242 MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 243 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 244 this.procedureStore.start(numThreads); 245 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 246 this.procedureEnv.getRemoteDispatcher().start(); 247 } 248 249 private void stopProcedureExecutor() { 250 if (this.procedureEnv != null) { 251 this.procedureEnv.getRemoteDispatcher().stop(); 252 } 253 254 if (this.procedureExecutor != null) { 255 this.procedureExecutor.stop(); 256 } 257 258 if (this.procedureStore != null) { 259 this.procedureStore.stop(isAborted()); 260 } 261 } 262 263 @Override 264 public boolean isInitialized() { 265 return true; 266 } 267 268 @Override 269 public ProcedureEvent getInitializedEvent() { 270 return this.initialized; 271 } 272 273 @Override 274 public MasterFileSystem getMasterFileSystem() { 275 return fileSystemManager; 276 } 277 278 @Override 279 public MasterWalManager getMasterWalManager() { 280 return walManager; 281 } 282 283 @Override 284 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 285 return procedureExecutor; 286 } 287 288 @Override 289 public LoadBalancer getLoadBalancer() { 290 return balancer; 291 } 292 293 @Override 294 public ServerManager getServerManager() { 295 return serverManager; 296 } 297 298 @Override 299 public AssignmentManager getAssignmentManager() { 300 return assignmentManager; 301 } 302 303 @Override 304 public TableStateManager getTableStateManager() { 305 return tableStateManager; 306 } 307 308 @Override 309 public ClusterConnection getConnection() { 310 return this.connection; 311 } 312 313 @Override 314 public ServerName getServerName() { 315 return MOCK_MASTER_SERVERNAME; 316 } 317 318 @Override 319 public CoordinatedStateManager getCoordinatedStateManager() { 320 return super.getCoordinatedStateManager(); 321 } 322 323 private static class MockRegionStateStore extends RegionStateStore { 324 public MockRegionStateStore(final MasterServices master) { 325 super(master); 326 } 327 328 @Override 329 public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException { 330 } 331 } 332 333 @Override 334 public TableDescriptors getTableDescriptors() { 335 return new TableDescriptors() { 336 @Override 337 public TableDescriptor remove(TableName tablename) throws IOException { 338 // noop 339 return null; 340 } 341 342 @Override 343 public Map<String, TableDescriptor> getAll() throws IOException { 344 // noop 345 return null; 346 } 347 348 @Override 349 public TableDescriptor get(TableName tablename) throws IOException { 350 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 351 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 352 return builder.build(); 353 } 354 355 @Override 356 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 357 return null; 358 } 359 360 @Override 361 public void add(TableDescriptor htd) throws IOException { 362 // noop 363 } 364 365 @Override 366 public void setCacheOn() throws IOException { 367 } 368 369 @Override 370 public void setCacheOff() throws IOException { 371 } 372 }; 373 } 374 375 private static MultiResponse buildMultiResponse(MultiRequest req) { 376 MultiResponse.Builder builder = MultiResponse.newBuilder(); 377 RegionActionResult.Builder regionActionResultBuilder = 378 RegionActionResult.newBuilder(); 379 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 380 for (RegionAction regionAction: req.getRegionActionList()) { 381 regionActionResultBuilder.clear(); 382 for (ClientProtos.Action action: regionAction.getActionList()) { 383 roeBuilder.clear(); 384 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 385 roeBuilder.setIndex(action.getIndex()); 386 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 387 } 388 builder.addRegionActionResult(regionActionResultBuilder.build()); 389 } 390 return builder.build(); 391 } 392}