001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.CompletableFuture; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.CoordinatedStateManager; 034import org.apache.hadoop.hbase.ServerMetricsBuilder; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableDescriptors; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.HConnectionTestingUtility; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.client.TableState; 044import org.apache.hadoop.hbase.master.DummyRegionServerList; 045import org.apache.hadoop.hbase.master.LoadBalancer; 046import org.apache.hadoop.hbase.master.MasterFileSystem; 047import org.apache.hadoop.hbase.master.MasterServices; 048import org.apache.hadoop.hbase.master.MasterWalManager; 049import org.apache.hadoop.hbase.master.MockNoopMasterServices; 050import org.apache.hadoop.hbase.master.ServerManager; 051import org.apache.hadoop.hbase.master.SplitWALManager; 052import org.apache.hadoop.hbase.master.TableStateManager; 053import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.master.region.MasterRegion; 058import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 059import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 060import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 061import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 063import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 064import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 065import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 066import org.apache.hadoop.hbase.replication.ReplicationException; 067import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 068import org.apache.hadoop.hbase.security.Superusers; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.zookeeper.KeeperException; 072import org.mockito.invocation.InvocationOnMock; 073import org.mockito.stubbing.Answer; 074 075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 085 086/** 087 * A mocked master services. Tries to fake it. May not always work. 088 */ 089public class MockMasterServices extends MockNoopMasterServices { 090 private final MasterFileSystem fileSystemManager; 091 private final MasterWalManager walManager; 092 private final SplitWALManager splitWALManager; 093 private final AssignmentManager assignmentManager; 094 private final TableStateManager tableStateManager; 095 private final MasterRegion masterRegion; 096 097 private MasterProcedureEnv procedureEnv; 098 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; 099 private ProcedureStore procedureStore; 100 private final Connection connection; 101 private final LoadBalancer balancer; 102 private final ServerManager serverManager; 103 private final ReplicationPeerManager rpm; 104 105 private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized"); 106 public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; 107 public static final ServerName MOCK_MASTER_SERVERNAME = 108 ServerName.valueOf("mockmaster.example.org", 1234, -1L); 109 110 public MockMasterServices(Configuration conf) throws IOException, ReplicationException { 111 super(conf); 112 Superusers.initialize(conf); 113 this.fileSystemManager = new MasterFileSystem(conf); 114 this.walManager = new MasterWalManager(this); 115 this.splitWALManager = 116 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 117 ? null 118 : new SplitWALManager(this); 119 this.masterRegion = MasterRegionFactory.create(this); 120 // Mock an AM. 121 this.assignmentManager = 122 new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion)); 123 this.balancer = LoadBalancerFactory.getLoadBalancer(conf); 124 this.serverManager = new ServerManager(this, new DummyRegionServerList()); 125 this.tableStateManager = mock(TableStateManager.class); 126 assignmentManager.initializationPostMetaOnline(); 127 when(this.tableStateManager.getTableState(any())).thenReturn(new TableState( 128 TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED)); 129 130 // Mock up a Client Interface 131 ClientProtos.ClientService.BlockingInterface ri = 132 mock(ClientProtos.ClientService.BlockingInterface.class); 133 MutateResponse.Builder builder = MutateResponse.newBuilder(); 134 builder.setProcessed(true); 135 try { 136 when(ri.mutate(any(), any())).thenReturn(builder.build()); 137 } catch (ServiceException se) { 138 throw ProtobufUtil.handleRemoteException(se); 139 } 140 try { 141 when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() { 142 @Override 143 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 144 return buildMultiResponse(invocation.getArgument(1)); 145 } 146 }); 147 } catch (ServiceException se) { 148 throw ProtobufUtil.getRemoteException(se); 149 } 150 this.connection = HConnectionTestingUtility.getMockedConnection(getConfiguration()); 151 // Set hbase.rootdir into test dir. 152 Path rootdir = CommonFSUtils.getRootDir(getConfiguration()); 153 CommonFSUtils.setRootDir(getConfiguration(), rootdir); 154 this.rpm = mock(ReplicationPeerManager.class); 155 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 156 when(rqs.listAllQueueIds(any(ServerName.class))).thenReturn(Collections.emptyList()); 157 when(rpm.getQueueStorage()).thenReturn(rqs); 158 } 159 160 public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) 161 throws IOException, KeeperException { 162 startProcedureExecutor(remoteDispatcher); 163 this.assignmentManager.start(); 164 for (int i = 0; i < numServes; ++i) { 165 ServerName sn = ServerName.valueOf("localhost", 100 + i, 1); 166 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 167 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 168 } 169 this.procedureExecutor.getEnvironment().setEventReady(initialized, true); 170 } 171 172 /** 173 * Call this restart method only after running MockMasterServices#start() The RSs can be 174 * differentiated by the port number, see ServerName in MockMasterServices#start() method above. 175 * Restart of region server will have new startcode in server name 176 * @param serverName Server name to be restarted 177 */ 178 public void restartRegionServer(ServerName serverName) throws IOException { 179 List<ServerName> onlineServers = serverManager.getOnlineServersList(); 180 long startCode = -1; 181 for (ServerName s : onlineServers) { 182 if (s.getAddress().equals(serverName.getAddress())) { 183 startCode = s.getStartcode() + 1; 184 break; 185 } 186 } 187 if (startCode == -1) { 188 return; 189 } 190 ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode); 191 serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn) 192 .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 193 } 194 195 @Override 196 public void stop(String why) { 197 stopProcedureExecutor(); 198 this.assignmentManager.stop(); 199 } 200 201 private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) 202 throws IOException { 203 final Configuration conf = getConfiguration(); 204 this.procedureStore = new NoopProcedureStore(); 205 this.procedureStore.registerListener(new ProcedureStoreListener() { 206 207 @Override 208 public void abortProcess() { 209 abort("The Procedure Store lost the lease", null); 210 } 211 }); 212 213 this.procedureEnv = new MasterProcedureEnv(this, 214 remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); 215 216 this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, 217 procedureEnv.getProcedureScheduler()); 218 219 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 220 Math.max(Runtime.getRuntime().availableProcessors(), 221 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); 222 final boolean abortOnCorruption = 223 conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, 224 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); 225 this.procedureStore.start(numThreads); 226 ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); 227 this.procedureEnv.getRemoteDispatcher().start(); 228 } 229 230 private void stopProcedureExecutor() { 231 if (this.procedureEnv != null) { 232 this.procedureEnv.getRemoteDispatcher().stop(); 233 } 234 235 if (this.procedureExecutor != null) { 236 this.procedureExecutor.stop(); 237 } 238 239 if (this.procedureStore != null) { 240 this.procedureStore.stop(isAborted()); 241 } 242 } 243 244 @Override 245 public boolean isInitialized() { 246 return true; 247 } 248 249 @Override 250 public ProcedureEvent<?> getInitializedEvent() { 251 return this.initialized; 252 } 253 254 @Override 255 public MasterFileSystem getMasterFileSystem() { 256 return fileSystemManager; 257 } 258 259 @Override 260 public MasterWalManager getMasterWalManager() { 261 return walManager; 262 } 263 264 @Override 265 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 266 return procedureExecutor; 267 } 268 269 @Override 270 public LoadBalancer getLoadBalancer() { 271 return balancer; 272 } 273 274 @Override 275 public ServerManager getServerManager() { 276 return serverManager; 277 } 278 279 @Override 280 public AssignmentManager getAssignmentManager() { 281 return assignmentManager; 282 } 283 284 @Override 285 public TableStateManager getTableStateManager() { 286 return tableStateManager; 287 } 288 289 @Override 290 public Connection getConnection() { 291 return this.connection; 292 } 293 294 @Override 295 public ServerName getServerName() { 296 return MOCK_MASTER_SERVERNAME; 297 } 298 299 @Override 300 public CoordinatedStateManager getCoordinatedStateManager() { 301 return super.getCoordinatedStateManager(); 302 } 303 304 private static class MockRegionStateStore extends RegionStateStore { 305 public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) { 306 super(master, masterRegion); 307 } 308 309 @Override 310 public CompletableFuture<Void> updateRegionLocation(RegionStateNode regionNode) { 311 return CompletableFuture.completedFuture(null); 312 } 313 } 314 315 @Override 316 public TableDescriptors getTableDescriptors() { 317 return new TableDescriptors() { 318 @Override 319 public TableDescriptor remove(TableName tablename) throws IOException { 320 // noop 321 return null; 322 } 323 324 @Override 325 public Map<String, TableDescriptor> getAll() throws IOException { 326 // noop 327 return null; 328 } 329 330 @Override 331 public TableDescriptor get(TableName tablename) throws IOException { 332 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename); 333 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME)); 334 return builder.build(); 335 } 336 337 @Override 338 public Map<String, TableDescriptor> getByNamespace(String name) throws IOException { 339 return null; 340 } 341 342 @Override 343 public void update(TableDescriptor htd, boolean cacheOnly) throws IOException { 344 // noop 345 } 346 }; 347 } 348 349 private static MultiResponse buildMultiResponse(MultiRequest req) { 350 MultiResponse.Builder builder = MultiResponse.newBuilder(); 351 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 352 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 353 for (RegionAction regionAction : req.getRegionActionList()) { 354 regionActionResultBuilder.clear(); 355 for (ClientProtos.Action action : regionAction.getActionList()) { 356 roeBuilder.clear(); 357 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 358 roeBuilder.setIndex(action.getIndex()); 359 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 360 } 361 builder.addRegionActionResult(regionActionResultBuilder.build()); 362 } 363 return builder.build(); 364 } 365 366 @Override 367 public SplitWALManager getSplitWALManager() { 368 return splitWALManager; 369 } 370 371 @Override 372 public ReplicationPeerManager getReplicationPeerManager() { 373 return rpm; 374 } 375}