001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.mockito.ArgumentMatchers.any;
021
022import java.io.IOException;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableMap;
027import java.util.SortedSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CoordinatedStateManager;
031import org.apache.hadoop.hbase.ServerLoad;
032import org.apache.hadoop.hbase.ServerMetricsBuilder;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableDescriptors;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.YouAreDeadException;
037import org.apache.hadoop.hbase.client.ClusterConnection;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.client.TableState;
044import org.apache.hadoop.hbase.master.LoadBalancer;
045import org.apache.hadoop.hbase.master.MasterFileSystem;
046import org.apache.hadoop.hbase.master.MasterServices;
047import org.apache.hadoop.hbase.master.MasterWalManager;
048import org.apache.hadoop.hbase.master.MockNoopMasterServices;
049import org.apache.hadoop.hbase.master.ServerManager;
050import org.apache.hadoop.hbase.master.TableStateManager;
051import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
052import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
054import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
055import org.apache.hadoop.hbase.procedure2.Procedure;
056import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
059import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
060import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
061import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
062import org.apache.hadoop.hbase.security.Superusers;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.zookeeper.KeeperException;
065import org.mockito.Mockito;
066import org.mockito.invocation.InvocationOnMock;
067import org.mockito.stubbing.Answer;
068
069import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
070
071import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
080
081
082/**
083 * A mocked master services.
084 * Tries to fake it. May not always work.
085 */
086public class MockMasterServices extends MockNoopMasterServices {
087  private final MasterFileSystem fileSystemManager;
088  private final MasterWalManager walManager;
089  private final AssignmentManager assignmentManager;
090  private final TableStateManager tableStateManager;
091
092  private MasterProcedureEnv procedureEnv;
093  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
094  private ProcedureStore procedureStore;
095  private final ClusterConnection connection;
096  private final LoadBalancer balancer;
097  private final ServerManager serverManager;
098  // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'.
099  private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers;
100
101  private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
102  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
103  public static final ServerName MOCK_MASTER_SERVERNAME =
104      ServerName.valueOf("mockmaster.example.org", 1234, -1L);
105
106  public MockMasterServices(Configuration conf,
107      NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers)
108  throws IOException {
109    super(conf);
110    this.regionsToRegionServers = regionsToRegionServers;
111    Superusers.initialize(conf);
112    this.fileSystemManager = new MasterFileSystem(conf);
113    this.walManager = new MasterWalManager(this);
114    // Mock an AM.
115    this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
116      @Override
117      public boolean isTableEnabled(final TableName tableName) {
118        return true;
119      }
120
121      @Override
122      public boolean isTableDisabled(final TableName tableName) {
123        return false;
124      }
125
126      @Override
127      protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) {
128        // Make a report with current state of the server 'serverName' before we call wait..
129        SortedSet<byte []> regions = regionsToRegionServers.get(serverName);
130        try {
131          getAssignmentManager().reportOnlineRegions(serverName,
132              regions == null? new HashSet<byte []>(): regions);
133        } catch (YouAreDeadException e) {
134          throw new RuntimeException(e);
135        }
136        return super.waitServerReportEvent(serverName, proc);
137      }
138    };
139    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
140    this.serverManager = new ServerManager(this);
141    this.tableStateManager = Mockito.mock(TableStateManager.class);
142    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).
143        thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"),
144            TableState.State.ENABLED));
145
146    // Mock up a Client Interface
147    ClientProtos.ClientService.BlockingInterface ri =
148        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
149    MutateResponse.Builder builder = MutateResponse.newBuilder();
150    builder.setProcessed(true);
151    try {
152      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
153    } catch (ServiceException se) {
154      throw ProtobufUtil.handleRemoteException(se);
155    }
156    try {
157      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
158          @Override
159          public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
160            return buildMultiResponse(invocation.getArgument(1));
161          }
162        });
163    } catch (ServiceException se) {
164      throw ProtobufUtil.getRemoteException(se);
165    }
166    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
167    // ClusterConnection return the HRI.  Have the HRI return a few mocked up responses
168    // to make our test work.
169    this.connection =
170        HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
171          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
172          RegionInfoBuilder.FIRST_META_REGIONINFO);
173    // Set hbase.rootdir into test dir.
174    Path rootdir = FSUtils.getRootDir(getConfiguration());
175    FSUtils.setRootDir(getConfiguration(), rootdir);
176  }
177
178  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
179      throws IOException, KeeperException {
180    startProcedureExecutor(remoteDispatcher);
181    this.assignmentManager.start();
182    for (int i = 0; i < numServes; ++i) {
183      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
184      serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
185    }
186    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
187  }
188
189  /**
190   * Call this restart method only after running MockMasterServices#start()
191   * The RSs can be differentiated by the port number, see
192   * ServerName in MockMasterServices#start() method above.
193   * Restart of region server will have new startcode in server name
194   *
195   * @param serverName Server name to be restarted
196   */
197  public void restartRegionServer(ServerName serverName) throws IOException {
198    List<ServerName> onlineServers = serverManager.getOnlineServersList();
199    long startCode = -1;
200    for (ServerName s : onlineServers) {
201      if (s.getAddress().equals(serverName.getAddress())) {
202        startCode = s.getStartcode() + 1;
203        break;
204      }
205    }
206    if (startCode == -1) {
207      return;
208    }
209    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
210    serverManager.regionServerReport(sn, new ServerLoad(ServerMetricsBuilder.of(sn)));
211  }
212
213  @Override
214  public void stop(String why) {
215    stopProcedureExecutor();
216    this.assignmentManager.stop();
217  }
218
219  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
220      throws IOException {
221    final Configuration conf = getConfiguration();
222    this.procedureStore = new NoopProcedureStore();
223    this.procedureStore.registerListener(new ProcedureStoreListener() {
224
225      @Override
226      public void abortProcess() {
227        abort("The Procedure Store lost the lease", null);
228      }
229    });
230
231    this.procedureEnv = new MasterProcedureEnv(this,
232       remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
233
234    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
235      procedureEnv.getProcedureScheduler());
236
237    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
238        Math.max(Runtime.getRuntime().availableProcessors(),
239          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
240    final boolean abortOnCorruption = conf.getBoolean(
241        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
242        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
243    this.procedureStore.start(numThreads);
244    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
245    this.procedureEnv.getRemoteDispatcher().start();
246  }
247
248  private void stopProcedureExecutor() {
249    if (this.procedureEnv != null) {
250      this.procedureEnv.getRemoteDispatcher().stop();
251    }
252
253    if (this.procedureExecutor != null) {
254      this.procedureExecutor.stop();
255    }
256
257    if (this.procedureStore != null) {
258      this.procedureStore.stop(isAborted());
259    }
260  }
261
262  @Override
263  public boolean isInitialized() {
264    return true;
265  }
266
267  @Override
268  public ProcedureEvent getInitializedEvent() {
269    return this.initialized;
270  }
271
272  @Override
273  public MasterFileSystem getMasterFileSystem() {
274    return fileSystemManager;
275  }
276
277  @Override
278  public MasterWalManager getMasterWalManager() {
279    return walManager;
280  }
281
282  @Override
283  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
284    return procedureExecutor;
285  }
286
287  @Override
288  public LoadBalancer getLoadBalancer() {
289    return balancer;
290  }
291
292  @Override
293  public ServerManager getServerManager() {
294    return serverManager;
295  }
296
297  @Override
298  public AssignmentManager getAssignmentManager() {
299    return assignmentManager;
300  }
301
302  @Override
303  public TableStateManager getTableStateManager() {
304    return tableStateManager;
305  }
306
307  @Override
308  public ClusterConnection getConnection() {
309    return this.connection;
310  }
311
312  @Override
313  public ServerName getServerName() {
314    return MOCK_MASTER_SERVERNAME;
315  }
316
317  @Override
318  public CoordinatedStateManager getCoordinatedStateManager() {
319    return super.getCoordinatedStateManager();
320  }
321
322  private static class MockRegionStateStore extends RegionStateStore {
323    public MockRegionStateStore(final MasterServices master) {
324      super(master);
325    }
326
327    @Override
328    public void updateRegionLocation(RegionStates.RegionStateNode regionNode) throws IOException {
329    }
330  }
331
332  @Override
333  public TableDescriptors getTableDescriptors() {
334    return new TableDescriptors() {
335      @Override
336      public TableDescriptor remove(TableName tablename) throws IOException {
337        // noop
338        return null;
339      }
340
341      @Override
342      public Map<String, TableDescriptor> getAll() throws IOException {
343        // noop
344        return null;
345      }
346
347      @Override
348      public TableDescriptor get(TableName tablename) throws IOException {
349        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
350        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
351        return builder.build();
352      }
353
354      @Override
355      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
356        return null;
357      }
358
359      @Override
360      public void add(TableDescriptor htd) throws IOException {
361        // noop
362      }
363
364      @Override
365      public void setCacheOn() throws IOException {
366      }
367
368      @Override
369      public void setCacheOff() throws IOException {
370      }
371    };
372  }
373
374  private static MultiResponse buildMultiResponse(MultiRequest req) {
375    MultiResponse.Builder builder = MultiResponse.newBuilder();
376    RegionActionResult.Builder regionActionResultBuilder =
377        RegionActionResult.newBuilder();
378    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
379    for (RegionAction regionAction: req.getRegionActionList()) {
380      regionActionResultBuilder.clear();
381      for (ClientProtos.Action action: regionAction.getActionList()) {
382        roeBuilder.clear();
383        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
384        roeBuilder.setIndex(action.getIndex());
385        regionActionResultBuilder.addResultOrException(roeBuilder.build());
386      }
387      builder.addRegionActionResult(regionActionResultBuilder.build());
388    }
389    return builder.build();
390  }
391}