001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.io.IOException;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.Future;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
041import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
042import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
043import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
046import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
047import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
048import org.apache.hadoop.hbase.testclassification.MasterTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.junit.jupiter.api.AfterEach;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.junit.jupiter.api.TestInfo;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
062
063@Tag(MasterTests.TAG)
064@Tag(MediumTests.TAG)
065public class TestServerRemoteProcedure {
066  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
067  private String testMethodName;
068
069  @BeforeEach
070  public void setTestMethod(TestInfo testInfo) {
071    testMethodName = testInfo.getTestMethod().get().getName();
072  }
073
074  private HBaseTestingUtil util;
075  private MockRSProcedureDispatcher rsDispatcher;
076  private MockMasterServices master;
077  private AssignmentManager am;
078  // Simple executor to run some simple tasks.
079  private ScheduledExecutorService executor;
080
081  @BeforeEach
082  public void setUp() throws Exception {
083    util = new HBaseTestingUtil();
084    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
085      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
086    master = new MockMasterServices(util.getConfiguration());
087    rsDispatcher = new MockRSProcedureDispatcher(master);
088    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
089    master.start(2, rsDispatcher);
090    am = master.getAssignmentManager();
091    master.getServerManager().getOnlineServersList().stream()
092      .forEach(serverName -> am.getRegionStates().createServer(serverName));
093  }
094
095  @AfterEach
096  public void tearDown() throws Exception {
097    master.stop("tearDown");
098    this.executor.shutdownNow();
099  }
100
101  @Test
102  public void testSplitWALAndCrashBeforeResponse() throws Exception {
103    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
104    ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
105    ServerRemoteProcedure splitWALRemoteProcedure =
106      new SplitWALRemoteProcedure(worker, crashedWorker, "test");
107    Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
108    Thread.sleep(2000);
109    master.getServerManager().expireServer(worker);
110    // if remoteCallFailed is called for this procedure, this procedure should be finished.
111    future.get(5000, TimeUnit.MILLISECONDS);
112    assertTrue(splitWALRemoteProcedure.isSuccess());
113  }
114
115  @Test
116  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
117    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
118    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
119    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
120    Thread.sleep(2000);
121    // complete the process and fail the process at the same time
122    ExecutorService threadPool = Executors.newFixedThreadPool(2);
123    threadPool.execute(() -> noopServerRemoteProcedure
124      .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
125    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
126      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
127    future.get(2000, TimeUnit.MILLISECONDS);
128    assertTrue(noopServerRemoteProcedure.isSuccess());
129  }
130
131  @Test
132  public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
133    TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
134    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
135      .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
136    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
137    env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
138    TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
139    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
140    OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
141    Future<byte[]> future = submitProcedure(openRegionProcedure);
142    Thread.sleep(2000);
143    rsDispatcher.removeNode(worker);
144    try {
145      future.get(2000, TimeUnit.MILLISECONDS);
146      fail();
147    } catch (TimeoutException e) {
148      LOG.info("timeout is expected");
149    }
150    assertFalse(openRegionProcedure.isFinished());
151  }
152
153  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
154    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
155  }
156
157  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
158    implements ServerProcedureInterface {
159
160    public NoopServerRemoteProcedure(ServerName targetServer) {
161      this.targetServer = targetServer;
162    }
163
164    @Override
165    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
166      return;
167    }
168
169    @Override
170    protected boolean abort(MasterProcedureEnv env) {
171      return false;
172    }
173
174    @Override
175    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
176      return;
177    }
178
179    @Override
180    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
181      return;
182    }
183
184    @Override
185    public Optional<RemoteProcedureDispatcher.RemoteOperation>
186      remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
187      return Optional.of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(),
188        new byte[0], env.getMasterServices().getMasterActiveTime()));
189    }
190
191    @Override
192    public synchronized void remoteOperationCompleted(MasterProcedureEnv env,
193      byte[] remoteResultData) {
194      complete(env, null);
195    }
196
197    @Override
198    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
199      RemoteProcedureException error) {
200      complete(env, error);
201    }
202
203    @Override
204    public boolean complete(MasterProcedureEnv env, Throwable error) {
205      return true;
206    }
207
208    @Override
209    public ServerName getServerName() {
210      return targetServer;
211    }
212
213    @Override
214    public boolean hasMetaTableRegion() {
215      return false;
216    }
217
218    @Override
219    public ServerOperationType getServerOperationType() {
220      return SWITCH_RPC_THROTTLE;
221    }
222
223  }
224
225  protected interface MockRSExecutor {
226    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
227      AdminProtos.ExecuteProceduresRequest req) throws IOException;
228  }
229
230  protected static class NoopRSExecutor implements MockRSExecutor {
231    @Override
232    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
233      AdminProtos.ExecuteProceduresRequest req) throws IOException {
234      if (req.getOpenRegionCount() > 0) {
235        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
236          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
237            execOpenRegion(server, openReq);
238          }
239        }
240      }
241      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
242    }
243
244    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
245      AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
246      return null;
247    }
248  }
249
250  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
251    private MockRSExecutor mockRsExec;
252
253    public MockRSProcedureDispatcher(final MasterServices master) {
254      super(master);
255    }
256
257    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
258      this.mockRsExec = mockRsExec;
259    }
260
261    @Override
262    protected void remoteDispatch(ServerName serverName,
263      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
264      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
265    }
266
267    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
268      public MockRemoteCall(final ServerName serverName,
269        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
270        super(serverName, operations);
271      }
272
273      @Override
274      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
275        final AdminProtos.ExecuteProceduresRequest request) throws IOException {
276        return mockRsExec.sendRequest(serverName, request);
277      }
278    }
279  }
280}