001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.Set;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.master.MasterServices;
039import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
040import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
041import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
042import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
045import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
046import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Rule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.rules.TestName;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
064
065@Category({ MasterTests.class, MediumTests.class })
066public class TestServerRemoteProcedure {
067  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
071  @Rule
072  public TestName name = new TestName();
073  private HBaseTestingUtil util;
074  private MockRSProcedureDispatcher rsDispatcher;
075  private MockMasterServices master;
076  private AssignmentManager am;
077  // Simple executor to run some simple tasks.
078  private ScheduledExecutorService executor;
079
080  @Before
081  public void setUp() throws Exception {
082    util = new HBaseTestingUtil();
083    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
084      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
085    master = new MockMasterServices(util.getConfiguration());
086    rsDispatcher = new MockRSProcedureDispatcher(master);
087    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
088    master.start(2, rsDispatcher);
089    am = master.getAssignmentManager();
090    master.getServerManager().getOnlineServersList().stream()
091      .forEach(serverName -> am.getRegionStates().createServer(serverName));
092  }
093
094  @After
095  public void tearDown() throws Exception {
096    master.stop("tearDown");
097    this.executor.shutdownNow();
098  }
099
100  @Test
101  public void testSplitWALAndCrashBeforeResponse() throws Exception {
102    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
103    ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1);
104    ServerRemoteProcedure splitWALRemoteProcedure =
105      new SplitWALRemoteProcedure(worker, crashedWorker, "test");
106    Future<byte[]> future = submitProcedure(splitWALRemoteProcedure);
107    Thread.sleep(2000);
108    master.getServerManager().expireServer(worker);
109    // if remoteCallFailed is called for this procedure, this procedure should be finished.
110    future.get(5000, TimeUnit.MILLISECONDS);
111    Assert.assertTrue(splitWALRemoteProcedure.isSuccess());
112  }
113
114  @Test
115  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
116    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
117    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
118    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
119    Thread.sleep(2000);
120    // complete the process and fail the process at the same time
121    ExecutorService threadPool = Executors.newFixedThreadPool(2);
122    threadPool.execute(() -> noopServerRemoteProcedure
123      .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
124    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
125      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
126    future.get(2000, TimeUnit.MILLISECONDS);
127    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
128  }
129
130  @Test
131  public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
132    TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
133    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
134      .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
135    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
136    env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
137    TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
138    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
139    OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
140    Future<byte[]> future = submitProcedure(openRegionProcedure);
141    Thread.sleep(2000);
142    rsDispatcher.removeNode(worker);
143    try {
144      future.get(2000, TimeUnit.MILLISECONDS);
145      fail();
146    } catch (TimeoutException e) {
147      LOG.info("timeout is expected");
148    }
149    Assert.assertFalse(openRegionProcedure.isFinished());
150  }
151
152  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
153    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
154  }
155
156  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
157    implements ServerProcedureInterface {
158
159    public NoopServerRemoteProcedure(ServerName targetServer) {
160      this.targetServer = targetServer;
161    }
162
163    @Override
164    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
165      return;
166    }
167
168    @Override
169    protected boolean abort(MasterProcedureEnv env) {
170      return false;
171    }
172
173    @Override
174    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
175      return;
176    }
177
178    @Override
179    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
180      return;
181    }
182
183    @Override
184    public Optional<RemoteProcedureDispatcher.RemoteOperation>
185      remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
186      return Optional.of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(),
187        new byte[0], env.getMasterServices().getMasterActiveTime()));
188    }
189
190    @Override
191    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
192      complete(env, null);
193    }
194
195    @Override
196    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
197      RemoteProcedureException error) {
198      complete(env, error);
199    }
200
201    @Override
202    public boolean complete(MasterProcedureEnv env, Throwable error) {
203      return true;
204    }
205
206    @Override
207    public ServerName getServerName() {
208      return targetServer;
209    }
210
211    @Override
212    public boolean hasMetaTableRegion() {
213      return false;
214    }
215
216    @Override
217    public ServerOperationType getServerOperationType() {
218      return SWITCH_RPC_THROTTLE;
219    }
220
221  }
222
223  protected interface MockRSExecutor {
224    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
225      AdminProtos.ExecuteProceduresRequest req) throws IOException;
226  }
227
228  protected static class NoopRSExecutor implements MockRSExecutor {
229    @Override
230    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
231      AdminProtos.ExecuteProceduresRequest req) throws IOException {
232      if (req.getOpenRegionCount() > 0) {
233        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
234          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
235            execOpenRegion(server, openReq);
236          }
237        }
238      }
239      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
240    }
241
242    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
243      AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
244      return null;
245    }
246  }
247
248  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
249    private MockRSExecutor mockRsExec;
250
251    public MockRSProcedureDispatcher(final MasterServices master) {
252      super(master);
253    }
254
255    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
256      this.mockRsExec = mockRsExec;
257    }
258
259    @Override
260    protected void remoteDispatch(ServerName serverName,
261      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
262      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
263    }
264
265    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
266      public MockRemoteCall(final ServerName serverName,
267        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
268        super(serverName, operations);
269      }
270
271      @Override
272      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
273        final AdminProtos.ExecuteProceduresRequest request) throws IOException {
274        return mockRsExec.sendRequest(serverName, request);
275      }
276    }
277  }
278}