001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.procedure;
020
021import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.NavigableMap;
026import java.util.Optional;
027import java.util.Set;
028import java.util.SortedSet;
029import java.util.concurrent.ConcurrentSkipListMap;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.master.MasterServices;
043import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
044import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
045import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
046import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
047import org.apache.hadoop.hbase.procedure2.Procedure;
048import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
049import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
050import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
051import org.apache.hadoop.hbase.testclassification.MasterTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.junit.After;
055import org.junit.Assert;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.ExpectedException;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
067
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
069
070@Category({ MasterTests.class, MediumTests.class })
071public class TestServerRemoteProcedure {
072  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
076  @Rule
077  public TestName name = new TestName();
078  @Rule
079  public final ExpectedException exception = ExpectedException.none();
080  protected HBaseTestingUtility util;
081  protected MockRSProcedureDispatcher rsDispatcher;
082  protected MockMasterServices master;
083  protected AssignmentManager am;
084  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
085      new ConcurrentSkipListMap<>();
086  // Simple executor to run some simple tasks.
087  protected ScheduledExecutorService executor;
088
089  @Before
090  public void setUp() throws Exception {
091    util = new HBaseTestingUtility();
092    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
093        .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
094    master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
095    rsDispatcher = new MockRSProcedureDispatcher(master);
096    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
097    master.start(2, rsDispatcher);
098    am = master.getAssignmentManager();
099    master.getServerManager().getOnlineServersList().stream()
100        .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName));
101  }
102
103  @After
104  public void tearDown() throws Exception {
105    master.stop("tearDown");
106    this.executor.shutdownNow();
107  }
108
109  @Test
110  public void testSplitWALAndCrashBeforeResponse() throws Exception {
111    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
112    ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
113    ServerRemoteProcedure splitWALRemoteProcedure =
114        new SplitWALRemoteProcedure(worker, crashedWorker, "test");
115    Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
116    Thread.sleep(2000);
117    master.getServerManager().expireServer(worker);
118    // if remoteCallFailed is called for this procedure, this procedure should be finished.
119    future.get(5000, TimeUnit.MILLISECONDS);
120    Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
121  }
122
123  @Test
124  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
125    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
126    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
127    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
128    Thread.sleep(2000);
129    // complete the process and fail the process at the same time
130    ExecutorService threadPool = Executors.newFixedThreadPool(2);
131    threadPool.execute(() -> noopServerRemoteProcedure
132        .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
133    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
134      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
135    future.get(2000, TimeUnit.MILLISECONDS);
136    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
137  }
138
139  @Test
140  public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
141    TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
142    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
143      .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
144    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
145    env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
146    TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
147    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
148    OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
149    Future<byte[]> future = submitProcedure(openRegionProcedure);
150    Thread.sleep(2000);
151    rsDispatcher.removeNode(worker);
152    try {
153      future.get(2000, TimeUnit.MILLISECONDS);
154      fail();
155    } catch (TimeoutException e) {
156      LOG.info("timeout is expected");
157    }
158    Assert.assertFalse(openRegionProcedure.isFinished());
159  }
160
161  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
162    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
163  }
164
165  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
166      implements ServerProcedureInterface {
167
168    public NoopServerRemoteProcedure(ServerName targetServer) {
169      this.targetServer = targetServer;
170    }
171
172    @Override
173    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
174      return;
175    }
176
177    @Override
178    protected boolean abort(MasterProcedureEnv env) {
179      return false;
180    }
181
182    @Override
183    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
184      return;
185    }
186
187    @Override
188    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
189      return;
190    }
191
192    @Override
193    public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(
194        MasterProcedureEnv env, ServerName serverName) {
195      return Optional
196          .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]));
197    }
198
199    @Override
200    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
201      complete(env, null);
202    }
203
204    @Override
205    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
206        RemoteProcedureException error) {
207      complete(env, error);
208    }
209
210    @Override
211    public void complete(MasterProcedureEnv env, Throwable error) {
212      this.succ = true;
213      return;
214    }
215
216    @Override
217    public ServerName getServerName() {
218      return targetServer;
219    }
220
221    @Override
222    public boolean hasMetaTableRegion() {
223      return false;
224    }
225
226    @Override
227    public ServerOperationType getServerOperationType() {
228      return SWITCH_RPC_THROTTLE;
229    }
230
231  }
232
233  protected interface MockRSExecutor {
234    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
235        AdminProtos.ExecuteProceduresRequest req) throws IOException;
236  }
237
238  protected static class NoopRSExecutor implements MockRSExecutor {
239    @Override
240    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
241        AdminProtos.ExecuteProceduresRequest req) throws IOException {
242      if (req.getOpenRegionCount() > 0) {
243        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
244          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
245            execOpenRegion(server, openReq);
246          }
247        }
248      }
249      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
250    }
251
252    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
253        AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
254      return null;
255    }
256  }
257
258  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
259    private MockRSExecutor mockRsExec;
260
261    public MockRSProcedureDispatcher(final MasterServices master) {
262      super(master);
263    }
264
265    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
266      this.mockRsExec = mockRsExec;
267    }
268
269    @Override
270    protected void remoteDispatch(ServerName serverName,
271        @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
272      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
273    }
274
275    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
276      public MockRemoteCall(final ServerName serverName,
277          @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
278        super(serverName, operations);
279      }
280
281      @Override
282      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
283          final AdminProtos.ExecuteProceduresRequest request) throws IOException {
284        return mockRsExec.sendRequest(serverName, request);
285      }
286    }
287  }
288}