001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
021import static org.mockito.ArgumentMatchers.any;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableMap;
027import java.util.SortedSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CoordinatedStateManager;
031import org.apache.hadoop.hbase.ServerMetricsBuilder;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableDescriptors;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ClusterConnection;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.client.TableState;
042import org.apache.hadoop.hbase.master.LoadBalancer;
043import org.apache.hadoop.hbase.master.MasterFileSystem;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.MasterWalManager;
046import org.apache.hadoop.hbase.master.MockNoopMasterServices;
047import org.apache.hadoop.hbase.master.ServerManager;
048import org.apache.hadoop.hbase.master.SplitWALManager;
049import org.apache.hadoop.hbase.master.TableStateManager;
050import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
051import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
052import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
053import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
054import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
055import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
056import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
057import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
058import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
059import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
060import org.apache.hadoop.hbase.security.Superusers;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.zookeeper.KeeperException;
063import org.mockito.Mockito;
064import org.mockito.invocation.InvocationOnMock;
065import org.mockito.stubbing.Answer;
066
067import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
078
079/**
080 * A mocked master services.
081 * Tries to fake it. May not always work.
082 */
083public class MockMasterServices extends MockNoopMasterServices {
084  private final MasterFileSystem fileSystemManager;
085  private final MasterWalManager walManager;
086  private final SplitWALManager splitWALManager;
087  private final AssignmentManager assignmentManager;
088  private final TableStateManager tableStateManager;
089
090  private MasterProcedureEnv procedureEnv;
091  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
092  private ProcedureStore procedureStore;
093  private final ClusterConnection connection;
094  private final LoadBalancer balancer;
095  private final ServerManager serverManager;
096
097  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
098  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
099  public static final ServerName MOCK_MASTER_SERVERNAME =
100      ServerName.valueOf("mockmaster.example.org", 1234, -1L);
101
102  public MockMasterServices(Configuration conf,
103      NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
104    super(conf);
105    Superusers.initialize(conf);
106    this.fileSystemManager = new MasterFileSystem(conf);
107    this.walManager = new MasterWalManager(this);
108    this.splitWALManager =
109      conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)?
110        null: new SplitWALManager(this);
111
112    // Mock an AM.
113    this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this));
114    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
115    this.serverManager = new ServerManager(this);
116    this.tableStateManager = Mockito.mock(TableStateManager.class);
117    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).
118        thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"),
119            TableState.State.ENABLED));
120
121    // Mock up a Client Interface
122    ClientProtos.ClientService.BlockingInterface ri =
123        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
124    MutateResponse.Builder builder = MutateResponse.newBuilder();
125    builder.setProcessed(true);
126    try {
127      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
128    } catch (ServiceException se) {
129      throw ProtobufUtil.handleRemoteException(se);
130    }
131    try {
132      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
133          @Override
134          public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
135            return buildMultiResponse(invocation.getArgument(1));
136          }
137        });
138    } catch (ServiceException se) {
139      throw ProtobufUtil.getRemoteException(se);
140    }
141    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
142    // ClusterConnection return the HRI.  Have the HRI return a few mocked up responses
143    // to make our test work.
144    this.connection =
145        HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
146          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri,
147          MOCK_MASTER_SERVERNAME, RegionInfoBuilder.FIRST_META_REGIONINFO);
148    // Set hbase.rootdir into test dir.
149    Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
150    CommonFSUtils.setRootDir(getConfiguration(), rootdir);
151  }
152
153  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
154      throws IOException, KeeperException {
155    startProcedureExecutor(remoteDispatcher);
156    this.assignmentManager.start();
157    for (int i = 0; i < numServes; ++i) {
158      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
159      serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
160    }
161    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
162  }
163
164  /**
165   * Call this restart method only after running MockMasterServices#start()
166   * The RSs can be differentiated by the port number, see
167   * ServerName in MockMasterServices#start() method above.
168   * Restart of region server will have new startcode in server name
169   *
170   * @param serverName Server name to be restarted
171   */
172  public void restartRegionServer(ServerName serverName) throws IOException {
173    List<ServerName> onlineServers = serverManager.getOnlineServersList();
174    long startCode = -1;
175    for (ServerName s : onlineServers) {
176      if (s.getAddress().equals(serverName.getAddress())) {
177        startCode = s.getStartcode() + 1;
178        break;
179      }
180    }
181    if (startCode == -1) {
182      return;
183    }
184    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
185    serverManager.regionServerReport(sn, ServerMetricsBuilder.of(sn));
186  }
187
188  @Override
189  public void stop(String why) {
190    stopProcedureExecutor();
191    this.assignmentManager.stop();
192  }
193
194  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
195      throws IOException {
196    final Configuration conf = getConfiguration();
197    this.procedureStore = new NoopProcedureStore();
198    this.procedureStore.registerListener(new ProcedureStoreListener() {
199
200      @Override
201      public void abortProcess() {
202        abort("The Procedure Store lost the lease", null);
203      }
204    });
205
206    this.procedureEnv = new MasterProcedureEnv(this,
207       remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
208
209    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
210      procedureEnv.getProcedureScheduler());
211
212    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
213        Math.max(Runtime.getRuntime().availableProcessors(),
214          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
215    final boolean abortOnCorruption = conf.getBoolean(
216        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
217        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
218    this.procedureStore.start(numThreads);
219    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
220    this.procedureEnv.getRemoteDispatcher().start();
221  }
222
223  private void stopProcedureExecutor() {
224    if (this.procedureEnv != null) {
225      this.procedureEnv.getRemoteDispatcher().stop();
226    }
227
228    if (this.procedureExecutor != null) {
229      this.procedureExecutor.stop();
230    }
231
232    if (this.procedureStore != null) {
233      this.procedureStore.stop(isAborted());
234    }
235  }
236
237  @Override
238  public boolean isInitialized() {
239    return true;
240  }
241
242  @Override
243  public ProcedureEvent<?> getInitializedEvent() {
244    return this.initialized;
245  }
246
247  @Override
248  public MasterFileSystem getMasterFileSystem() {
249    return fileSystemManager;
250  }
251
252  @Override
253  public MasterWalManager getMasterWalManager() {
254    return walManager;
255  }
256
257  @Override
258  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
259    return procedureExecutor;
260  }
261
262  @Override
263  public LoadBalancer getLoadBalancer() {
264    return balancer;
265  }
266
267  @Override
268  public ServerManager getServerManager() {
269    return serverManager;
270  }
271
272  @Override
273  public AssignmentManager getAssignmentManager() {
274    return assignmentManager;
275  }
276
277  @Override
278  public TableStateManager getTableStateManager() {
279    return tableStateManager;
280  }
281
282  @Override
283  public ClusterConnection getConnection() {
284    return this.connection;
285  }
286
287  @Override
288  public ServerName getServerName() {
289    return MOCK_MASTER_SERVERNAME;
290  }
291
292  @Override
293  public CoordinatedStateManager getCoordinatedStateManager() {
294    return super.getCoordinatedStateManager();
295  }
296
297  private static class MockRegionStateStore extends RegionStateStore {
298    public MockRegionStateStore(final MasterServices master) {
299      super(master);
300    }
301
302    @Override
303    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
304    }
305  }
306
307  @Override
308  public TableDescriptors getTableDescriptors() {
309    return new TableDescriptors() {
310      @Override
311      public TableDescriptor remove(TableName tablename) throws IOException {
312        // noop
313        return null;
314      }
315
316      @Override
317      public Map<String, TableDescriptor> getAll() throws IOException {
318        // noop
319        return null;
320      }
321
322      @Override
323      public TableDescriptor get(TableName tablename) throws IOException {
324        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
325        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
326        return builder.build();
327      }
328
329      @Override
330      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
331        return null;
332      }
333
334      @Override
335      public void update(TableDescriptor htd, boolean cacheOnly) throws IOException {
336        // noop
337      }
338    };
339  }
340
341  private static MultiResponse buildMultiResponse(MultiRequest req) {
342    MultiResponse.Builder builder = MultiResponse.newBuilder();
343    RegionActionResult.Builder regionActionResultBuilder =
344        RegionActionResult.newBuilder();
345    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
346    for (RegionAction regionAction: req.getRegionActionList()) {
347      regionActionResultBuilder.clear();
348      for (ClientProtos.Action action: regionAction.getActionList()) {
349        roeBuilder.clear();
350        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
351        roeBuilder.setIndex(action.getIndex());
352        regionActionResultBuilder.addResultOrException(roeBuilder.build());
353      }
354      builder.addRegionActionResult(regionActionResultBuilder.build());
355    }
356    return builder.build();
357  }
358
359  @Override public SplitWALManager getSplitWALManager() {
360    return splitWALManager;
361  }
362}