001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
022import static org.mockito.ArgumentMatchers.any;
023
024import java.io.IOException;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.SortedSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.CoordinatedStateManager;
032import org.apache.hadoop.hbase.ServerMetricsBuilder;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.TableDescriptors;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ClusterConnection;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.client.TableState;
043import org.apache.hadoop.hbase.master.DummyRegionServerList;
044import org.apache.hadoop.hbase.master.LoadBalancer;
045import org.apache.hadoop.hbase.master.MasterFileSystem;
046import org.apache.hadoop.hbase.master.MasterServices;
047import org.apache.hadoop.hbase.master.MasterWalManager;
048import org.apache.hadoop.hbase.master.MockNoopMasterServices;
049import org.apache.hadoop.hbase.master.ServerManager;
050import org.apache.hadoop.hbase.master.SplitWALManager;
051import org.apache.hadoop.hbase.master.TableStateManager;
052import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
055import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
056import org.apache.hadoop.hbase.master.region.MasterRegion;
057import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
058import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
059import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
060import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
061import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
062import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
063import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
064import org.apache.hadoop.hbase.security.Superusers;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.zookeeper.KeeperException;
068import org.mockito.Mockito;
069import org.mockito.invocation.InvocationOnMock;
070import org.mockito.stubbing.Answer;
071
072import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
083
084/**
085 * A mocked master services. Tries to fake it. May not always work.
086 */
087public class MockMasterServices extends MockNoopMasterServices {
088  private final MasterFileSystem fileSystemManager;
089  private final MasterWalManager walManager;
090  private final SplitWALManager splitWALManager;
091  private final AssignmentManager assignmentManager;
092  private final TableStateManager tableStateManager;
093  private final MasterRegion masterRegion;
094
095  private MasterProcedureEnv procedureEnv;
096  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
097  private ProcedureStore procedureStore;
098  private final ClusterConnection connection;
099  private final LoadBalancer balancer;
100  private final ServerManager serverManager;
101
102  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
103  public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf";
104  public static final ServerName MOCK_MASTER_SERVERNAME =
105    ServerName.valueOf("mockmaster.example.org", 1234, -1L);
106
107  public MockMasterServices(Configuration conf,
108    NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers) throws IOException {
109    super(conf);
110    Superusers.initialize(conf);
111    this.fileSystemManager = new MasterFileSystem(conf);
112    this.walManager = new MasterWalManager(this);
113    this.splitWALManager =
114      conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
115        ? null
116        : new SplitWALManager(this);
117    this.masterRegion = MasterRegionFactory.create(this);
118    // Mock an AM.
119    this.assignmentManager =
120      new AssignmentManager(this, masterRegion, new MockRegionStateStore(this, masterRegion));
121    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
122    this.serverManager = new ServerManager(this, new DummyRegionServerList());
123    this.tableStateManager = Mockito.mock(TableStateManager.class);
124    Mockito.when(this.tableStateManager.getTableState(Mockito.any())).thenReturn(new TableState(
125      TableName.valueOf("AnyTableNameSetInMockMasterServcies"), TableState.State.ENABLED));
126
127    // Mock up a Client Interface
128    ClientProtos.ClientService.BlockingInterface ri =
129      Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
130    MutateResponse.Builder builder = MutateResponse.newBuilder();
131    builder.setProcessed(true);
132    try {
133      Mockito.when(ri.mutate(any(), any())).thenReturn(builder.build());
134    } catch (ServiceException se) {
135      throw ProtobufUtil.handleRemoteException(se);
136    }
137    try {
138      Mockito.when(ri.multi(any(), any())).thenAnswer(new Answer<MultiResponse>() {
139        @Override
140        public MultiResponse answer(InvocationOnMock invocation) throws Throwable {
141          return buildMultiResponse(invocation.getArgument(1));
142        }
143      });
144    } catch (ServiceException se) {
145      throw ProtobufUtil.getRemoteException(se);
146    }
147    // Mock n ClusterConnection and an AdminProtocol implementation. Have the
148    // ClusterConnection return the HRI. Have the HRI return a few mocked up responses
149    // to make our test work.
150    this.connection = HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
151      Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
152      RegionInfoBuilder.FIRST_META_REGIONINFO);
153    // Set hbase.rootdir into test dir.
154    Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
155    CommonFSUtils.setRootDir(getConfiguration(), rootdir);
156  }
157
158  public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
159    throws IOException, KeeperException {
160    startProcedureExecutor(remoteDispatcher);
161    this.assignmentManager.start();
162    for (int i = 0; i < numServes; ++i) {
163      ServerName sn = ServerName.valueOf("localhost", 100 + i, 1);
164      serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
165        .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
166    }
167    this.procedureExecutor.getEnvironment().setEventReady(initialized, true);
168  }
169
170  /**
171   * Call this restart method only after running MockMasterServices#start() The RSs can be
172   * differentiated by the port number, see ServerName in MockMasterServices#start() method above.
173   * Restart of region server will have new startcode in server name
174   * @param serverName Server name to be restarted
175   */
176  public void restartRegionServer(ServerName serverName) throws IOException {
177    List<ServerName> onlineServers = serverManager.getOnlineServersList();
178    long startCode = -1;
179    for (ServerName s : onlineServers) {
180      if (s.getAddress().equals(serverName.getAddress())) {
181        startCode = s.getStartcode() + 1;
182        break;
183      }
184    }
185    if (startCode == -1) {
186      return;
187    }
188    ServerName sn = ServerName.valueOf(serverName.getAddress().toString(), startCode);
189    serverManager.regionServerReport(sn, ServerMetricsBuilder.newBuilder(sn)
190      .setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
191  }
192
193  @Override
194  public void stop(String why) {
195    stopProcedureExecutor();
196    this.assignmentManager.stop();
197  }
198
199  private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
200    throws IOException {
201    final Configuration conf = getConfiguration();
202    this.procedureStore = new NoopProcedureStore();
203    this.procedureStore.registerListener(new ProcedureStoreListener() {
204
205      @Override
206      public void abortProcess() {
207        abort("The Procedure Store lost the lease", null);
208      }
209    });
210
211    this.procedureEnv = new MasterProcedureEnv(this,
212      remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
213
214    this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
215      procedureEnv.getProcedureScheduler());
216
217    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
218      Math.max(Runtime.getRuntime().availableProcessors(),
219        MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
220    final boolean abortOnCorruption =
221      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
222        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
223    this.procedureStore.start(numThreads);
224    ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
225    this.procedureEnv.getRemoteDispatcher().start();
226  }
227
228  private void stopProcedureExecutor() {
229    if (this.procedureEnv != null) {
230      this.procedureEnv.getRemoteDispatcher().stop();
231    }
232
233    if (this.procedureExecutor != null) {
234      this.procedureExecutor.stop();
235    }
236
237    if (this.procedureStore != null) {
238      this.procedureStore.stop(isAborted());
239    }
240  }
241
242  @Override
243  public boolean isInitialized() {
244    return true;
245  }
246
247  @Override
248  public ProcedureEvent<?> getInitializedEvent() {
249    return this.initialized;
250  }
251
252  @Override
253  public MasterFileSystem getMasterFileSystem() {
254    return fileSystemManager;
255  }
256
257  @Override
258  public MasterWalManager getMasterWalManager() {
259    return walManager;
260  }
261
262  @Override
263  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
264    return procedureExecutor;
265  }
266
267  @Override
268  public LoadBalancer getLoadBalancer() {
269    return balancer;
270  }
271
272  @Override
273  public ServerManager getServerManager() {
274    return serverManager;
275  }
276
277  @Override
278  public AssignmentManager getAssignmentManager() {
279    return assignmentManager;
280  }
281
282  @Override
283  public TableStateManager getTableStateManager() {
284    return tableStateManager;
285  }
286
287  @Override
288  public ClusterConnection getConnection() {
289    return this.connection;
290  }
291
292  @Override
293  public ServerName getServerName() {
294    return MOCK_MASTER_SERVERNAME;
295  }
296
297  @Override
298  public CoordinatedStateManager getCoordinatedStateManager() {
299    return super.getCoordinatedStateManager();
300  }
301
302  private static class MockRegionStateStore extends RegionStateStore {
303    public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) {
304      super(master, masterRegion);
305    }
306
307    @Override
308    public void updateRegionLocation(RegionStateNode regionNode) throws IOException {
309    }
310  }
311
312  @Override
313  public TableDescriptors getTableDescriptors() {
314    return new TableDescriptors() {
315      @Override
316      public TableDescriptor remove(TableName tablename) throws IOException {
317        // noop
318        return null;
319      }
320
321      @Override
322      public Map<String, TableDescriptor> getAll() throws IOException {
323        // noop
324        return null;
325      }
326
327      @Override
328      public TableDescriptor get(TableName tablename) throws IOException {
329        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tablename);
330        builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(DEFAULT_COLUMN_FAMILY_NAME));
331        return builder.build();
332      }
333
334      @Override
335      public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
336        return null;
337      }
338
339      @Override
340      public void update(TableDescriptor htd, boolean cacheOnly) throws IOException {
341        // noop
342      }
343    };
344  }
345
346  private static MultiResponse buildMultiResponse(MultiRequest req) {
347    MultiResponse.Builder builder = MultiResponse.newBuilder();
348    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
349    ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
350    for (RegionAction regionAction : req.getRegionActionList()) {
351      regionActionResultBuilder.clear();
352      for (ClientProtos.Action action : regionAction.getActionList()) {
353        roeBuilder.clear();
354        roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
355        roeBuilder.setIndex(action.getIndex());
356        regionActionResultBuilder.addResultOrException(roeBuilder.build());
357      }
358      builder.addRegionActionResult(regionActionResultBuilder.build());
359    }
360    return builder.build();
361  }
362
363  @Override
364  public SplitWALManager getSplitWALManager() {
365    return splitWALManager;
366  }
367}