001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.mockito.ArgumentMatchers.any; 021 022import java.io.IOException; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableMap; 027import java.util.SortedSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.CoordinatedStateManager; 031import org.apache.hadoop.hbase.ServerLoad; 032import org.apache.hadoop.hbase.ServerMetricsBuilder; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableDescriptors; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.YouAreDeadException; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableState; 044import org.apache.hadoop.hbase.master.LoadBalancer; 045import org.apache.hadoop.hbase.master.MasterFileSystem; 046import org.apache.hadoop.hbase.master.MasterServices; 047import org.apache.hadoop.hbase.master.MasterWalManager; 048import org.apache.hadoop.hbase.master.MockNoopMasterServices; 049import org.apache.hadoop.hbase.master.ServerManager; 050import org.apache.hadoop.hbase.master.TableStateManager; 051import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 052import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 054import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 055import org.apache.hadoop.hbase.procedure2.Procedure; 056import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 059import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 060import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 061import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 062import org.apache.hadoop.hbase.security.Superusers; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.zookeeper.KeeperException; 065import org.mockito.Mockito; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068 069import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 070 071import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 080 081 082/** 083 * A mocked master services. 084 * Tries to fake it. May not always work. 085 */ 086public class MockMasterServices extends MockNoopMasterServices { 087 private final MasterFileSystem fileSystemManager; 088 private final MasterWalManager walManager; 089 private final AssignmentManager assignmentManager; 090 private final TableStateManager tableStateManager; 091 092 private MasterProcedureEnv procedureEnv; 093 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 094 private ProcedureStore procedureStore; 095 private final ClusterConnection connection; 096 private final LoadBalancer balancer; 097 private final ServerManager serverManager; 098 // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'. 099 private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers; 100 101 private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); 102 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 103 public static final ServerName MOCK_MASTER_SERVERNAME = 104 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 105 106 public MockMasterServices(Configuration conf, 107 NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers) 108 throws IOException { 109 super(conf); 110 this.regionsToRegionServers = regionsToRegionServers; 111 Superusers.initialize(conf); 112 this.fileSystemManager = new MasterFileSystem(conf); 113 this.walManager = new MasterWalManager(this); 114 // Mock an AM. 115 this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { 116 @Override 117 public boolean isTableEnabled(final TableName tableName) { 118 return true; 119 } 120 121 @Override 122 public boolean isTableDisabled(final TableName tableName) { 123 return false; 124 } 125 126 @Override 127 protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { 128 // Make a report with current state of the server 'serverName' before we call wait.. 129 SortedSet<byte []> regions = regionsToRegionServers.get(serverName); 130 try { 131 getAssignmentManager().reportOnlineRegions(serverName, 132 regions == null? new HashSet<byte []>(): regions); 133 } catch (YouAreDeadException e) { 134 throw new RuntimeException(e); 135 } 136 return super.waitServerReportEvent(serverName, proc); 137 } 138 }; 139 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 140 this.serverManager = new ServerManager(this); 141 this.tableStateManager = Mockito.mock(TableStateManager.class); 142 Mockito.when(this.tableStateManager.getTableState(Mockito.any())). 143 thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"), 144 TableState.State.ENABLED)); 145 146 // Mock up a Client Interface 147 ClientProtos.ClientService.BlockingInterface ri = 148 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 149 MutateResponse.Builder builder = MutateResponse.newBuilder(); 150 builder.setProcessed(true); 151 try { 152 Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build()); 153 } catch (ServiceException se) { 154 throw ProtobufUtil.handleRemoteException(se); 155 } 156 try { 157 Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 158 @Override 159 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 160 return buildMultiResponse(invocation.getArgument(1)); 161 } 162 }); 163 } catch (ServiceException se) { 164 throw ProtobufUtil.getRemoteException(se); 165 } 166 // Mock n ClusterConnection and an AdminProtocol implementation. Have the 167 // ClusterConnection return the HRI. Have the HRI return a few mocked up responses 168 // to make our test work. 169 this.connection = 170 HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), 171 Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME, 172 RegionInfoBuilder.FIRST_META_REGIONINFO); 173 // Set hbase.rootdir into test dir. 174 Path rootdir = FSUtils.getRootDir(getConfiguration()); 175 FSUtils.setRootDir(getConfiguration(), rootdir); 176 } 177 178 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 179 throws IOException, KeeperException { 180 startProcedureExecutor(remoteDispatcher); 181 this.assignmentManager.start(); 182 for (int i = 0; i < numServes; ++i) { 183 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 184 serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); 185 } 186 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 187 } 188 189 /** 190 * Call this restart method only after running MockMasterServices#start() 191 * The RSs can be differentiated by the port number, see 192 * ServerName in MockMasterServices#start() method above. 193 * Restart of region server will have new startcode in server name 194 * 195 * @param serverName Server name to be restarted 196 */ 197 public void restartRegionServer(ServerName serverName) throws IOException { 198 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 199 long startCode = -1; 200 for (ServerName s : onlineServers) { 201 if (s.getAddress().equals(serverName.getAddress())) { 202 startCode = s.getStartcode() + 1; 203 break; 204 } 205 } 206 if (startCode == -1) { 207 return; 208 } 209 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 210 serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn))); 211 } 212 213 @Override 214 public void stop(String why) { 215 stopProcedureExecutor(); 216 this.assignmentManager.stop(); 217 } 218 219 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 220 throws IOException { 221 final Configuration conf = getConfiguration(); 222 this.procedureStore = new NoopProcedureStore(); 223 this.procedureStore.registerListener(new ProcedureStoreListener() { 224 225 @Override 226 public void abortProcess() { 227 abort("The Procedure Store lost the lease", null); 228 } 229 }); 230 231 this.procedureEnv = new MasterProcedureEnv(this, 232 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 233 234 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 235 procedureEnv.getProcedureScheduler()); 236 237 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 238 Math.max(Runtime.getRuntime().availableProcessors(), 239 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 240 final boolean abortOnCorruption = conf.getBoolean( 241 MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 242 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 243 this.procedureStore.start(numThreads); 244 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 245 this.procedureEnv.getRemoteDispatcher().start(); 246 } 247 248 private void stopProcedureExecutor() { 249 if (this.procedureEnv != null) { 250 this.procedureEnv.getRemoteDispatcher().stop(); 251 } 252 253 if (this.procedureExecutor != null) { 254 this.procedureExecutor.stop(); 255 } 256 257 if (this.procedureStore != null) { 258 this.procedureStore.stop(isAborted()); 259 } 260 } 261 262 @Override 263 public boolean isInitialized() { 264 return true; 265 } 266 267 @Override 268 public ProcedureEvent getInitializedEvent() { 269 return this.initialized; 270 } 271 272 @Override 273 public MasterFileSystem getMasterFileSystem() { 274 return fileSystemManager; 275 } 276 277 @Override 278 public MasterWalManager getMasterWalManager() { 279 return walManager; 280 } 281 282 @Override 283 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 284 return procedureExecutor; 285 } 286 287 @Override 288 public LoadBalancer getLoadBalancer() { 289 return balancer; 290 } 291 292 @Override 293 public ServerManager getServerManager() { 294 return serverManager; 295 } 296 297 @Override 298 public AssignmentManager getAssignmentManager() { 299 return assignmentManager; 300 } 301 302 @Override 303 public TableStateManager getTableStateManager() { 304 return tableStateManager; 305 } 306 307 @Override 308 public ClusterConnection getConnection() { 309 return this.connection; 310 } 311 312 @Override 313 public ServerName getServerName() { 314 return MOCK_MASTER_SERVERNAME; 315 } 316 317 @Override 318 public CoordinatedStateManager getCoordinatedStateManager() { 319 return super.getCoordinatedStateManager(); 320 } 321 322 private static class MockRegionStateStore extends RegionStateStore { 323 public MockRegionStateStore(final MasterServices master) { 324 super(master); 325 } 326 327 @Override 328 public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException { 329 } 330 } 331 332 @Override 333 public TableDescriptors getTableDescriptors() { 334 return new TableDescriptors() { 335 @Override 336 public TableDescriptor remove(TableName tablename) throws IOException { 337 // noop 338 return null; 339 } 340 341 @Override 342 public Map<String, TableDescriptor> getAll() throws IOException { 343 // noop 344 return null; 345 } 346 347 @Override 348 public TableDescriptor get(TableName tablename) throws IOException { 349 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 350 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 351 return builder.build(); 352 } 353 354 @Override 355 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 356 return null; 357 } 358 359 @Override 360 public void add(TableDescriptor htd) throws IOException { 361 // noop 362 } 363 364 @Override 365 public void setCacheOn() throws IOException { 366 } 367 368 @Override 369 public void setCacheOff() throws IOException { 370 } 371 }; 372 } 373 374 private static MultiResponse buildMultiResponse(MultiRequest req) { 375 MultiResponse.Builder builder = MultiResponse.newBuilder(); 376 RegionActionResult.Builder regionActionResultBuilder = 377 RegionActionResult.newBuilder(); 378 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 379 for (RegionAction regionAction: req.getRegionActionList()) { 380 regionActionResultBuilder.clear(); 381 for (ClientProtos.Action action: regionAction.getActionList()) { 382 roeBuilder.clear(); 383 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 384 roeBuilder.setIndex(action.getIndex()); 385 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 386 } 387 builder.addRegionActionResult(regionActionResultBuilder.build()); 388 } 389 return builder.build(); 390 } 391}