001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.mockito.ArgumentMatchers.any;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.SortedSet;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.CoordinatedStateManager;
030import org.apache.hadoop.hbase.ServerMetricsBuilder;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableDescriptors;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ClusterConnection;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.client.TableState;
041import org.apache.hadoop.hbase.master.LoadBalancer;
042import org.apache.hadoop.hbase.master.MasterFileSystem;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.MasterWalManager;
045import org.apache.hadoop.hbase.master.MockNoopMasterServices;
046import org.apache.hadoop.hbase.master.ServerManager;
047import org.apache.hadoop.hbase.master.TableStateManager;
048import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
049import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
050import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
051import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
052import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
053import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
054import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
055import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
056import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
057import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
058import org.apache.hadoop.hbase.security.Superusers;
059import org.apache.hadoop.hbase.util.CommonFSUtils;
060import org.apache.zookeeper.KeeperException;
061import org.mockito.Mockito;
062import org.mockito.invocation.InvocationOnMock;
063import org.mockito.stubbing.Answer;
064
065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
066
067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
076
077/**
078 * A mocked master services.
079 * Tries to fake it. May not always work.
080 */
081public class MockMasterServices extends MockNoopMasterServices {
082  private final MasterFileSystem fileSystemManager;
083  private final MasterWalManager walManager;
084  private final AssignmentManager assignmentManager;
085  private final TableStateManager tableStateManager;
086
087  private MasterProcedureEnv procedureEnv;
088  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
089  private ProcedureStore procedureStore;
090  private final ClusterConnection connection;
091  private final LoadBalancer balancer;
092  private final ServerManager serverManager;
093
094  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
095  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
096  public static final ServerName MOCK_MASTER_SERVERNAME =
097      ServerName.valueOf("mockmaster.example.org", 1234, -1L);
098
099  public MockMasterServices(Configuration conf,
100      NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
101    super(conf);
102    Superusers.initialize(conf);
103    this.fileSystemManager = new MasterFileSystem(conf);
104    this.walManager = new MasterWalManager(this);
105    // Mock an AM.
106    this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
107      @Override
108      public boolean isTableEnabled(final TableName tableName) {
109        return true;
110      }
111
112      @Override
113      public boolean isTableDisabled(final TableName tableName) {
114        return false;
115      }
116    };
117    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
118    this.serverManager = new ServerManager(this);
119    this.tableStateManager = Mockito.mock(TableStateManager.class);
120    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).
121        thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"),
122            TableState.State.ENABLED));
123
124    // Mock up a Client Interface
125    ClientProtos.ClientService.BlockingInterface ri =
126        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
127    MutateResponse.Builder builder = MutateResponse.newBuilder();
128    builder.setProcessed(true);
129    try {
130      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
131    } catch (ServiceException se) {
132      throw ProtobufUtil.handleRemoteException(se);
133    }
134    try {
135      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
136          @Override
137          public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
138            return buildMultiResponse(invocation.getArgument(1));
139          }
140        });
141    } catch (ServiceException se) {
142      throw ProtobufUtil.getRemoteException(se);
143    }
144    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
145    // ClusterConnection return the HRI.  Have the HRI return a few mocked up responses
146    // to make our test work.
147    this.connection =
148        HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
149          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
150          RegionInfoBuilder.FIRST_META_REGIONINFO);
151    // Set hbase.rootdir into test dir.
152    Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
153    CommonFSUtils.setRootDir(getConfiguration(), rootdir);
154  }
155
156  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
157      throws IOException, KeeperException {
158    startProcedureExecutor(remoteDispatcher);
159    this.assignmentManager.start();
160    for (int i = 0; i < numServes; ++i) {
161      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
162      serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
163    }
164    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
165  }
166
167  /**
168   * Call this restart method only after running MockMasterServices#start()
169   * The RSs can be differentiated by the port number, see
170   * ServerName in MockMasterServices#start() method above.
171   * Restart of region server will have new startcode in server name
172   *
173   * @param serverName Server name to be restarted
174   */
175  public void restartRegionServer(ServerName serverName) throws IOException {
176    List<ServerName> onlineServers = serverManager.getOnlineServersList();
177    long startCode = -1;
178    for (ServerName s : onlineServers) {
179      if (s.getAddress().equals(serverName.getAddress())) {
180        startCode = s.getStartcode() + 1;
181        break;
182      }
183    }
184    if (startCode == -1) {
185      return;
186    }
187    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
188    serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
189  }
190
191  @Override
192  public void stop(String why) {
193    stopProcedureExecutor();
194    this.assignmentManager.stop();
195  }
196
197  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
198      throws IOException {
199    final Configuration conf = getConfiguration();
200    this.procedureStore = new NoopProcedureStore();
201    this.procedureStore.registerListener(new ProcedureStoreListener() {
202
203      @Override
204      public void abortProcess() {
205        abort("The Procedure Store lost the lease", null);
206      }
207    });
208
209    this.procedureEnv = new MasterProcedureEnv(this,
210       remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
211
212    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
213      procedureEnv.getProcedureScheduler());
214
215    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
216        Math.max(Runtime.getRuntime().availableProcessors(),
217          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
218    final boolean abortOnCorruption = conf.getBoolean(
219        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
220        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
221    this.procedureStore.start(numThreads);
222    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
223    this.procedureEnv.getRemoteDispatcher().start();
224  }
225
226  private void stopProcedureExecutor() {
227    if (this.procedureEnv != null) {
228      this.procedureEnv.getRemoteDispatcher().stop();
229    }
230
231    if (this.procedureExecutor != null) {
232      this.procedureExecutor.stop();
233    }
234
235    if (this.procedureStore != null) {
236      this.procedureStore.stop(isAborted());
237    }
238  }
239
240  @Override
241  public boolean isInitialized() {
242    return true;
243  }
244
245  @Override
246  public ProcedureEvent<?> getInitializedEvent() {
247    return this.initialized;
248  }
249
250  @Override
251  public MasterFileSystem getMasterFileSystem() {
252    return fileSystemManager;
253  }
254
255  @Override
256  public MasterWalManager getMasterWalManager() {
257    return walManager;
258  }
259
260  @Override
261  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
262    return procedureExecutor;
263  }
264
265  @Override
266  public LoadBalancer getLoadBalancer() {
267    return balancer;
268  }
269
270  @Override
271  public ServerManager getServerManager() {
272    return serverManager;
273  }
274
275  @Override
276  public AssignmentManager getAssignmentManager() {
277    return assignmentManager;
278  }
279
280  @Override
281  public TableStateManager getTableStateManager() {
282    return tableStateManager;
283  }
284
285  @Override
286  public ClusterConnection getConnection() {
287    return this.connection;
288  }
289
290  @Override
291  public ServerName getServerName() {
292    return MOCK_MASTER_SERVERNAME;
293  }
294
295  @Override
296  public CoordinatedStateManager getCoordinatedStateManager() {
297    return super.getCoordinatedStateManager();
298  }
299
300  private static class MockRegionStateStore extends RegionStateStore {
301    public MockRegionStateStore(final MasterServices master) {
302      super(master);
303    }
304
305    @Override
306    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
307    }
308  }
309
310  @Override
311  public TableDescriptors getTableDescriptors() {
312    return new TableDescriptors() {
313      @Override
314      public TableDescriptor remove(TableName tablename) throws IOException {
315        // noop
316        return null;
317      }
318
319      @Override
320      public Map<String, TableDescriptor> getAll() throws IOException {
321        // noop
322        return null;
323      }
324
325      @Override
326      public TableDescriptor get(TableName tablename) throws IOException {
327        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
328        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
329        return builder.build();
330      }
331
332      @Override
333      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
334        return null;
335      }
336
337      @Override
338      public void update(TableDescriptor htd) throws IOException {
339        // noop
340      }
341
342      @Override
343      public void setCacheOn() throws IOException {
344      }
345
346      @Override
347      public void setCacheOff() throws IOException {
348      }
349    };
350  }
351
352  private static MultiResponse buildMultiResponse(MultiRequest req) {
353    MultiResponse.Builder builder = MultiResponse.newBuilder();
354    RegionActionResult.Builder regionActionResultBuilder =
355        RegionActionResult.newBuilder();
356    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
357    for (RegionAction regionAction: req.getRegionActionList()) {
358      regionActionResultBuilder.clear();
359      for (ClientProtos.Action action: regionAction.getActionList()) {
360        roeBuilder.clear();
361        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
362        roeBuilder.setIndex(action.getIndex());
363        regionActionResultBuilder.addResultOrException(roeBuilder.build());
364      }
365      builder.addRegionActionResult(regionActionResultBuilder.build());
366    }
367    return builder.build();
368  }
369}