001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.NavigableMap;
025import java.util.Optional;
026import java.util.Set;
027import java.util.SortedSet;
028import java.util.concurrent.ConcurrentSkipListMap;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.TimeoutException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
043import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
044import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
045import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
046import org.apache.hadoop.hbase.procedure2.Procedure;
047import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
048import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
049import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
050import org.apache.hadoop.hbase.testclassification.MasterTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.After;
054import org.junit.Assert;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.ExpectedException;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
068
069@Category({ MasterTests.class, MediumTests.class })
070public class TestServerRemoteProcedure {
071  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
075  @Rule
076  public TestName name = new TestName();
077  @Rule
078  public final ExpectedException exception = ExpectedException.none();
079  protected HBaseTestingUtility util;
080  protected MockRSProcedureDispatcher rsDispatcher;
081  protected MockMasterServices master;
082  protected AssignmentManager am;
083  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
084    new ConcurrentSkipListMap<>();
085  // Simple executor to run some simple tasks.
086  protected ScheduledExecutorService executor;
087
088  @Before
089  public void setUp() throws Exception {
090    util = new HBaseTestingUtility();
091    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
092      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
093    master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
094    rsDispatcher = new MockRSProcedureDispatcher(master);
095    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
096    master.start(2, rsDispatcher);
097    am = master.getAssignmentManager();
098    master.getServerManager().getOnlineServersList().stream()
099      .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
100  }
101
102  @After
103  public void tearDown() throws Exception {
104    master.stop("tearDown");
105    this.executor.shutdownNow();
106  }
107
108  @Test
109  public void testSplitWALAndCrashBeforeResponse() throws Exception {
110    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
111    ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
112    ServerRemoteProcedure splitWALRemoteProcedure =
113      new SplitWALRemoteProcedure(worker, crashedWorker, "test");
114    Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
115    Thread.sleep(2000);
116    master.getServerManager().expireServer(worker);
117    // if remoteCallFailed is called for this procedure, this procedure should be finished.
118    future.get(5000, TimeUnit.MILLISECONDS);
119    Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
120  }
121
122  @Test
123  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
124    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
125    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
126    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
127    Thread.sleep(2000);
128    // complete the process and fail the process at the same time
129    ExecutorService threadPool = Executors.newFixedThreadPool(2);
130    threadPool.execute(() -> noopServerRemoteProcedure
131      .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
132    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
133      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
134    future.get(2000, TimeUnit.MILLISECONDS);
135    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
136  }
137
138  @Test
139  public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
140    TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
141    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
142      .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
143    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
144    env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
145    TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
146    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
147    OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
148    Future<byte[]> future = submitProcedure(openRegionProcedure);
149    Thread.sleep(2000);
150    rsDispatcher.removeNode(worker);
151    try {
152      future.get(2000, TimeUnit.MILLISECONDS);
153      fail();
154    } catch (TimeoutException e) {
155      LOG.info("timeout is expected");
156    }
157    Assert.assertFalse(openRegionProcedure.isFinished());
158  }
159
160  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
161    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
162  }
163
164  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
165    implements ServerProcedureInterface {
166
167    public NoopServerRemoteProcedure(ServerName targetServer) {
168      this.targetServer = targetServer;
169    }
170
171    @Override
172    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
173      return;
174    }
175
176    @Override
177    protected boolean abort(MasterProcedureEnv env) {
178      return false;
179    }
180
181    @Override
182    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
183      return;
184    }
185
186    @Override
187    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
188      return;
189    }
190
191    @Override
192    public Optional<RemoteProcedureDispatcher.RemoteOperation>
193      remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
194      return Optional
195        .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]));
196    }
197
198    @Override
199    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
200      complete(env, null);
201    }
202
203    @Override
204    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
205      RemoteProcedureException error) {
206      complete(env, error);
207    }
208
209    @Override
210    public void complete(MasterProcedureEnv env, Throwable error) {
211      this.succ = true;
212      return;
213    }
214
215    @Override
216    public ServerName getServerName() {
217      return targetServer;
218    }
219
220    @Override
221    public boolean hasMetaTableRegion() {
222      return false;
223    }
224
225    @Override
226    public ServerOperationType getServerOperationType() {
227      return SWITCH_RPC_THROTTLE;
228    }
229
230  }
231
232  protected interface MockRSExecutor {
233    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
234      AdminProtos.ExecuteProceduresRequest req) throws IOException;
235  }
236
237  protected static class NoopRSExecutor implements MockRSExecutor {
238    @Override
239    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
240      AdminProtos.ExecuteProceduresRequest req) throws IOException {
241      if (req.getOpenRegionCount() > 0) {
242        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
243          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
244            execOpenRegion(server, openReq);
245          }
246        }
247      }
248      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
249    }
250
251    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
252      AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
253      return null;
254    }
255  }
256
257  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
258    private MockRSExecutor mockRsExec;
259
260    public MockRSProcedureDispatcher(final MasterServices master) {
261      super(master);
262    }
263
264    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
265      this.mockRsExec = mockRsExec;
266    }
267
268    @Override
269    protected void remoteDispatch(ServerName serverName,
270      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
271      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
272    }
273
274    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
275      public MockRemoteCall(final ServerName serverName,
276        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
277        super(serverName, operations);
278      }
279
280      @Override
281      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
282        final AdminProtos.ExecuteProceduresRequest request) throws IOException {
283        return mockRsExec.sendRequest(serverName, request);
284      }
285    }
286  }
287}