001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
021import static org.mockito.ArgumentMatchers.any;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableMap;
027import java.util.SortedSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CoordinatedStateManager;
031import org.apache.hadoop.hbase.ServerMetricsBuilder;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableDescriptors;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.client.TableState;
041import org.apache.hadoop.hbase.master.LoadBalancer;
042import org.apache.hadoop.hbase.master.MasterFileSystem;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.MasterWalManager;
045import org.apache.hadoop.hbase.master.MockNoopMasterServices;
046import org.apache.hadoop.hbase.master.ServerManager;
047import org.apache.hadoop.hbase.master.SplitWALManager;
048import org.apache.hadoop.hbase.master.TableStateManager;
049import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
050import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
051import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
052import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
053import org.apache.hadoop.hbase.master.region.MasterRegion;
054import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
055import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
056import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
057import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
058import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
059import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
060import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
061import org.apache.hadoop.hbase.security.Superusers;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.zookeeper.KeeperException;
065import org.mockito.Mockito;
066import org.mockito.invocation.InvocationOnMock;
067import org.mockito.stubbing.Answer;
068
069import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
070
071import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
079
080/**
081 * A mocked master services.
082 * Tries to fake it. May not always work.
083 */
084public class MockMasterServices extends MockNoopMasterServices {
085  private final MasterFileSystem fileSystemManager;
086  private final MasterWalManager walManager;
087  private final SplitWALManager splitWALManager;
088  private final AssignmentManager assignmentManager;
089  private final TableStateManager tableStateManager;
090  private final MasterRegion masterRegion;
091
092  private MasterProcedureEnv procedureEnv;
093  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
094  private ProcedureStore procedureStore;
095  private final Connection connection;
096  private final LoadBalancer balancer;
097  private final ServerManager serverManager;
098
099  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
100  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
101  public static final ServerName MOCK_MASTER_SERVERNAME =
102      ServerName.valueOf("mockmaster.example.org", 1234, -1L);
103
104  public MockMasterServices(Configuration conf,
105      NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
106    super(conf);
107    Superusers.initialize(conf);
108    this.fileSystemManager = new MasterFileSystem(conf);
109    this.walManager = new MasterWalManager(this);
110    this.splitWALManager =
111      conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)?
112        null: new SplitWALManager(this);
113    this.masterRegion = MasterRegionFactory.create(this);
114    // Mock an AM.
115    this.assignmentManager =
116      new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion));
117    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
118    this.serverManager = new ServerManager(this);
119    this.tableStateManager = Mockito.mock(TableStateManager.class);
120    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).
121        thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"),
122            TableState.State.ENABLED));
123
124    // Mock up a Client Interface
125    ClientProtos.ClientService.BlockingInterface ri =
126        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
127    MutateResponse.Builder builder = MutateResponse.newBuilder();
128    builder.setProcessed(true);
129    try {
130      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
131    } catch (ServiceException se) {
132      throw ProtobufUtil.handleRemoteException(se);
133    }
134    try {
135      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
136          @Override
137          public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
138            return buildMultiResponse(invocation.getArgument(1));
139          }
140        });
141    } catch (ServiceException se) {
142      throw ProtobufUtil.getRemoteException(se);
143    }
144    this.connection = HConnectionTestingUtility.getMockedConnection(getConfiguration());
145    // Set hbase.rootdir into test dir.
146    Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
147    CommonFSUtils.setRootDir(getConfiguration(), rootdir);
148  }
149
150  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
151      throws IOException, KeeperException {
152    startProcedureExecutor(remoteDispatcher);
153    this.assignmentManager.start();
154    for (int i = 0; i < numServes; ++i) {
155      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
156      serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
157        .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
158    }
159    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
160  }
161
162  /**
163   * Call this restart method only after running MockMasterServices#start()
164   * The RSs can be differentiated by the port number, see
165   * ServerName in MockMasterServices#start() method above.
166   * Restart of region server will have new startcode in server name
167   *
168   * @param serverName Server name to be restarted
169   */
170  public void restartRegionServer(ServerName serverName) throws IOException {
171    List<ServerName> onlineServers = serverManager.getOnlineServersList();
172    long startCode = -1;
173    for (ServerName s : onlineServers) {
174      if (s.getAddress().equals(serverName.getAddress())) {
175        startCode = s.getStartcode() + 1;
176        break;
177      }
178    }
179    if (startCode == -1) {
180      return;
181    }
182    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
183    serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
184      .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
185  }
186
187  @Override
188  public void stop(String why) {
189    stopProcedureExecutor();
190    this.assignmentManager.stop();
191  }
192
193  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
194      throws IOException {
195    final Configuration conf = getConfiguration();
196    this.procedureStore = new NoopProcedureStore();
197    this.procedureStore.registerListener(new ProcedureStoreListener() {
198
199      @Override
200      public void abortProcess() {
201        abort("The Procedure Store lost the lease", null);
202      }
203    });
204
205    this.procedureEnv = new MasterProcedureEnv(this,
206       remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
207
208    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
209      procedureEnv.getProcedureScheduler());
210
211    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
212        Math.max(Runtime.getRuntime().availableProcessors(),
213          MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
214    final boolean abortOnCorruption = conf.getBoolean(
215        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
216        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
217    this.procedureStore.start(numThreads);
218    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
219    this.procedureEnv.getRemoteDispatcher().start();
220  }
221
222  private void stopProcedureExecutor() {
223    if (this.procedureEnv != null) {
224      this.procedureEnv.getRemoteDispatcher().stop();
225    }
226
227    if (this.procedureExecutor != null) {
228      this.procedureExecutor.stop();
229    }
230
231    if (this.procedureStore != null) {
232      this.procedureStore.stop(isAborted());
233    }
234  }
235
236  @Override
237  public boolean isInitialized() {
238    return true;
239  }
240
241  @Override
242  public ProcedureEvent<?> getInitializedEvent() {
243    return this.initialized;
244  }
245
246  @Override
247  public MasterFileSystem getMasterFileSystem() {
248    return fileSystemManager;
249  }
250
251  @Override
252  public MasterWalManager getMasterWalManager() {
253    return walManager;
254  }
255
256  @Override
257  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
258    return procedureExecutor;
259  }
260
261  @Override
262  public LoadBalancer getLoadBalancer() {
263    return balancer;
264  }
265
266  @Override
267  public ServerManager getServerManager() {
268    return serverManager;
269  }
270
271  @Override
272  public AssignmentManager getAssignmentManager() {
273    return assignmentManager;
274  }
275
276  @Override
277  public TableStateManager getTableStateManager() {
278    return tableStateManager;
279  }
280
281  @Override
282  public Connection getConnection() {
283    return this.connection;
284  }
285
286  @Override
287  public ServerName getServerName() {
288    return MOCK_MASTER_SERVERNAME;
289  }
290
291  @Override
292  public CoordinatedStateManager getCoordinatedStateManager() {
293    return super.getCoordinatedStateManager();
294  }
295
296  private static class MockRegionStateStore extends RegionStateStore {
297    public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) {
298      super(master, masterRegion);
299    }
300
301    @Override
302    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
303    }
304  }
305
306  @Override
307  public TableDescriptors getTableDescriptors() {
308    return new TableDescriptors() {
309      @Override
310      public TableDescriptor remove(TableName tablename) throws IOException {
311        // noop
312        return null;
313      }
314
315      @Override
316      public Map<String, TableDescriptor> getAll() throws IOException {
317        // noop
318        return null;
319      }
320
321      @Override
322      public TableDescriptor get(TableName tablename) throws IOException {
323        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
324        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
325        return builder.build();
326      }
327
328      @Override
329      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
330        return null;
331      }
332
333      @Override
334      public void update(TableDescriptor htd, boolean cacheOnly) throws IOException {
335        // noop
336      }
337    };
338  }
339
340  private static MultiResponse buildMultiResponse(MultiRequest req) {
341    MultiResponse.Builder builder = MultiResponse.newBuilder();
342    RegionActionResult.Builder regionActionResultBuilder =
343        RegionActionResult.newBuilder();
344    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
345    for (RegionAction regionAction: req.getRegionActionList()) {
346      regionActionResultBuilder.clear();
347      for (ClientProtos.Action action: regionAction.getActionList()) {
348        roeBuilder.clear();
349        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
350        roeBuilder.setIndex(action.getIndex());
351        regionActionResultBuilder.addResultOrException(roeBuilder.build());
352      }
353      builder.addRegionActionResult(regionActionResultBuilder.build());
354    }
355    return builder.build();
356  }
357
358  @Override public SplitWALManager getSplitWALManager() {
359    return splitWALManager;
360  }
361}