001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.mockito.ArgumentMatchers.any;
021
022import java.io.IOException;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableMap;
027import java.util.SortedSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CoordinatedStateManager;
031import org.apache.hadoop.hbase.ServerLoad;
032import org.apache.hadoop.hbase.ServerMetricsBuilder;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableDescriptors;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.YouAreDeadException;
037import org.apache.hadoop.hbase.client.ClusterConnection;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.client.TableState;
044import org.apache.hadoop.hbase.master.LoadBalancer;
045import org.apache.hadoop.hbase.master.MasterFileSystem;
046import org.apache.hadoop.hbase.master.MasterServices;
047import org.apache.hadoop.hbase.master.MasterWalManager;
048import org.apache.hadoop.hbase.master.MockNoopMasterServices;
049import org.apache.hadoop.hbase.master.ServerManager;
050import org.apache.hadoop.hbase.master.TableStateManager;
051import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
052import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
054import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
055import org.apache.hadoop.hbase.procedure2.Procedure;
056import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
059import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
060import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
061import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
062import org.apache.hadoop.hbase.security.Superusers;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.zookeeper.KeeperException;
065import org.mockito.Mockito;
066import org.mockito.invocation.InvocationOnMock;
067import org.mockito.stubbing.Answer;
068
069import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
070
071import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
080
081
082/**
083 * A mocked master services.
084 * Tries to fake it. May not always work.
085 */
086public class MockMasterServices extends MockNoopMasterServices {
087  private final MasterFileSystem fileSystemManager;
088  private final MasterWalManager walManager;
089  private final AssignmentManager assignmentManager;
090  private final TableStateManager tableStateManager;
091
092  private MasterProcedureEnv procedureEnv;
093  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
094  private ProcedureStore procedureStore;
095  private final ClusterConnection connection;
096  private final LoadBalancer balancer;
097  private final ServerManager serverManager;
098  // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'.
099  private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers;
100
101  private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
102  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
103  public static final ServerName MOCK_MASTER_SERVERNAME =
104      ServerName.valueOf("mockmaster.example.org", 1234, -1L);
105
106  public MockMasterServices(Configuration conf,
107      NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers)
108  throws IOException {
109    super(conf);
110    this.regionsToRegionServers = regionsToRegionServers;
111    Superusers.initialize(conf);
112    this.fileSystemManager = new MasterFileSystem(conf);
113    this.walManager = new MasterWalManager(this);
114    // Mock an AM.
115    this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
116      @Override
117      public boolean isTableEnabled(final TableName tableName) {
118        return true;
119      }
120
121      @Override
122      public boolean isTableDisabled(final TableName tableName) {
123        return false;
124      }
125
126      @Override
127      protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
128        // Make a report with current state of the server 'serverName' before we call wait..
129        SortedSet<byte []> regions = regionsToRegionServers.get(serverName);
130        try {
131          getAssignmentManager().reportOnlineRegions(serverName,
132              regions == null? new HashSet<byte []>(): regions);
133        } catch (YouAreDeadException e) {
134          throw new RuntimeException(e);
135        }
136        return super.waitServerReportEvent(serverName, proc);
137      }
138    };
139    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
140    this.serverManager = new ServerManager(this);
141    this.tableStateManager = Mockito.mock(TableStateManager.class);
142    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).
143        thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"),
144            TableState.State.ENABLED));
145
146    // Mock up a Client Interface
147    ClientProtos.ClientService.BlockingInterface ri =
148        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
149    MutateResponse.Builder builder = MutateResponse.newBuilder();
150    builder.setProcessed(true);
151    try {
152      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
153    } catch (ServiceException se) {
154      throw ProtobufUtil.handleRemoteException(se);
155    }
156    try {
157      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
158          @Override
159          public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
160            return buildMultiResponse(invocation.getArgument(1));
161          }
162        });
163    } catch (ServiceException se) {
164      throw ProtobufUtil.getRemoteException(se);
165    }
166    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
167    // ClusterConnection return the HRI.  Have the HRI return a few mocked up responses
168    // to make our test work.
169    this.connection =
170        HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
171          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
172          RegionInfoBuilder.FIRST_META_REGIONINFO);
173    // Set hbase.rootdir into test dir.
174    Path rootdir = FSUtils.getRootDir(getConfiguration());
175    FSUtils.setRootDir(getConfiguration(), rootdir);
176    Mockito.mock(AdminProtos.AdminService.BlockingInterface.class);
177  }
178
179  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
180      throws IOException, KeeperException {
181    startProcedureExecutor(remoteDispatcher);
182    this.assignmentManager.start();
183    for (int i = 0; i < numServes; ++i) {
184      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
185      serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
186    }
187    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
188  }
189
190  /**
191   * Call this restart method only after running MockMasterServices#start()
192   * The RSs can be differentiated by the port number, see
193   * ServerName in MockMasterServices#start() method above.
194   * Restart of region server will have new startcode in server name
195   *
196   * @param serverName Server name to be restarted
197   */
198  public void restartRegionServer(ServerName serverName) throws IOException {
199    List<ServerName> onlineServers = serverManager.getOnlineServersList();
200    long startCode = -1;
201    for (ServerName s : onlineServers) {
202      if (s.getAddress().equals(serverName.getAddress())) {
203        startCode = s.getStartcode() + 1;
204        break;
205      }
206    }
207    if (startCode == -1) {
208      return;
209    }
210    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
211    serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
212  }
213
214  @Override
215  public void stop(String why) {
216    stopProcedureExecutor();
217    this.assignmentManager.stop();
218  }
219
220  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
221      throws IOException {
222    final Configuration conf = getConfiguration();
223    this.procedureStore = new NoopProcedureStore();
224    this.procedureStore.registerListener(new ProcedureStoreListener() {
225
226      @Override
227      public void abortProcess() {
228        abort("The Procedure Store lost the lease", null);
229      }
230    });
231
232    this.procedureEnv = new MasterProcedureEnv(this,
233       remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
234
235    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
236      procedureEnv.getProcedureScheduler());
237
238    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
239        Math.max(Runtime.getRuntime().availableProcessors(),
240          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
241    final boolean abortOnCorruption = conf.getBoolean(
242        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
243        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
244    this.procedureStore.start(numThreads);
245    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
246    this.procedureEnv.getRemoteDispatcher().start();
247  }
248
249  private void stopProcedureExecutor() {
250    if (this.procedureEnv != null) {
251      this.procedureEnv.getRemoteDispatcher().stop();
252    }
253
254    if (this.procedureExecutor != null) {
255      this.procedureExecutor.stop();
256    }
257
258    if (this.procedureStore != null) {
259      this.procedureStore.stop(isAborted());
260    }
261  }
262
263  @Override
264  public boolean isInitialized() {
265    return true;
266  }
267
268  @Override
269  public ProcedureEvent getInitializedEvent() {
270    return this.initialized;
271  }
272
273  @Override
274  public MasterFileSystem getMasterFileSystem() {
275    return fileSystemManager;
276  }
277
278  @Override
279  public MasterWalManager getMasterWalManager() {
280    return walManager;
281  }
282
283  @Override
284  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
285    return procedureExecutor;
286  }
287
288  @Override
289  public LoadBalancer getLoadBalancer() {
290    return balancer;
291  }
292
293  @Override
294  public ServerManager getServerManager() {
295    return serverManager;
296  }
297
298  @Override
299  public AssignmentManager getAssignmentManager() {
300    return assignmentManager;
301  }
302
303  @Override
304  public TableStateManager getTableStateManager() {
305    return tableStateManager;
306  }
307
308  @Override
309  public ClusterConnection getConnection() {
310    return this.connection;
311  }
312
313  @Override
314  public ServerName getServerName() {
315    return MOCK_MASTER_SERVERNAME;
316  }
317
318  @Override
319  public CoordinatedStateManager getCoordinatedStateManager() {
320    return super.getCoordinatedStateManager();
321  }
322
323  private static class MockRegionStateStore extends RegionStateStore {
324    public MockRegionStateStore(final MasterServices master) {
325      super(master);
326    }
327
328    @Override
329    public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException {
330    }
331  }
332
333  @Override
334  public TableDescriptors getTableDescriptors() {
335    return new TableDescriptors() {
336      @Override
337      public TableDescriptor remove(TableName tablename) throws IOException {
338        // noop
339        return null;
340      }
341
342      @Override
343      public Map<String, TableDescriptor> getAll() throws IOException {
344        // noop
345        return null;
346      }
347
348      @Override
349      public TableDescriptor get(TableName tablename) throws IOException {
350        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
351        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
352        return builder.build();
353      }
354
355      @Override
356      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
357        return null;
358      }
359
360      @Override
361      public void add(TableDescriptor htd) throws IOException {
362        // noop
363      }
364
365      @Override
366      public void setCacheOn() throws IOException {
367      }
368
369      @Override
370      public void setCacheOff() throws IOException {
371      }
372    };
373  }
374
375  private static MultiResponse buildMultiResponse(MultiRequest req) {
376    MultiResponse.Builder builder = MultiResponse.newBuilder();
377    RegionActionResult.Builder regionActionResultBuilder =
378        RegionActionResult.newBuilder();
379    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
380    for (RegionAction regionAction: req.getRegionActionList()) {
381      regionActionResultBuilder.clear();
382      for (ClientProtos.Action action: regionAction.getActionList()) {
383        roeBuilder.clear();
384        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
385        roeBuilder.setIndex(action.getIndex());
386        regionActionResultBuilder.addResultOrException(roeBuilder.build());
387      }
388      builder.addRegionActionResult(regionActionResultBuilder.build());
389    }
390    return builder.build();
391  }
392}