001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.util.NavigableMap;
023import java.util.Set;
024import java.util.SortedSet;
025import java.util.concurrent.ConcurrentSkipListMap;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.master.MasterServices;
036import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
037import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
038import org.apache.hadoop.hbase.master.replication.RefreshPeerProcedure;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
042import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.junit.After;
046import org.junit.Assert;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.ExpectedException;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
059
060@Category({ MasterTests.class, MediumTests.class })
061public class TestServerRemoteProcedure {
062  private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class);
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestServerRemoteProcedure.class);
066  @Rule
067  public TestName name = new TestName();
068  @Rule
069  public final ExpectedException exception = ExpectedException.none();
070  protected HBaseTestingUtility util;
071  protected MockRSProcedureDispatcher rsDispatcher;
072  protected MockMasterServices master;
073  protected AssignmentManager am;
074  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
075      new ConcurrentSkipListMap<>();
076  // Simple executor to run some simple tasks.
077  protected ScheduledExecutorService executor;
078
079  @Before
080  public void setUp() throws Exception {
081    util = new HBaseTestingUtility();
082    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
083        .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
084    master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
085    rsDispatcher = new MockRSProcedureDispatcher(master);
086    rsDispatcher.setMockRsExecutor(new NoopRSExecutor());
087    master.start(2, rsDispatcher);
088    am = master.getAssignmentManager();
089  }
090
091  @After
092  public void tearDown() throws Exception {
093    master.stop("tearDown");
094    this.executor.shutdownNow();
095  }
096
097  @Test
098  public void testRemoteProcedureAndCrashBeforeResponse() throws Exception {
099    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
100    ServerRemoteProcedure refreshPeerProcedure =
101        new RefreshPeerProcedure("test", PeerProcedureInterface.PeerOperationType.ADD, worker);
102    Future<byte[]> future = submitProcedure(refreshPeerProcedure);
103    Thread.sleep(2000);
104    master.getServerManager().expireServer(worker);
105    // if remoteCallFailed is called for this procedure, this procedure should be finished.
106    future.get(5000, TimeUnit.MILLISECONDS);
107    Assert.assertTrue(refreshPeerProcedure.isSuccess());
108  }
109
110  @Test
111  public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception {
112    ServerName worker = master.getServerManager().getOnlineServersList().get(0);
113    ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker);
114    Future<byte[]> future = submitProcedure(noopServerRemoteProcedure);
115    Thread.sleep(2000);
116    // complete the process and fail the process at the same time
117    ExecutorService threadPool = Executors.newFixedThreadPool(2);
118    threadPool.execute(() -> noopServerRemoteProcedure
119        .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null));
120    threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed(
121      master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException()));
122    future.get(2000, TimeUnit.MILLISECONDS);
123    Assert.assertTrue(noopServerRemoteProcedure.isSuccess());
124  }
125
126  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
127    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
128  }
129
130  private static class NoopServerRemoteProcedure extends ServerRemoteProcedure
131      implements PeerProcedureInterface {
132
133    public NoopServerRemoteProcedure(ServerName targetServer) {
134      this.targetServer = targetServer;
135    }
136
137    @Override
138    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
139      return;
140    }
141
142    @Override
143    protected boolean abort(MasterProcedureEnv env) {
144      return false;
145    }
146
147    @Override
148    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
149      return;
150    }
151
152    @Override
153    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
154      return;
155    }
156
157    @Override
158    public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
159        ServerName serverName) {
160      return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]);
161    }
162
163    @Override
164    public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
165      complete(env, null);
166    }
167
168    @Override
169    public synchronized void remoteOperationFailed(MasterProcedureEnv env,
170        RemoteProcedureException error) {
171      complete(env, error);
172    }
173
174    @Override
175    public void complete(MasterProcedureEnv env, Throwable error) {
176      this.succ = true;
177      return;
178    }
179
180    @Override
181    public String getPeerId() {
182      return "test";
183    }
184
185    @Override
186    public PeerOperationType getPeerOperationType() {
187      return PeerOperationType.ADD;
188    }
189  }
190
191  protected interface MockRSExecutor {
192    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
193        AdminProtos.ExecuteProceduresRequest req) throws IOException;
194  }
195
196  protected static class NoopRSExecutor implements MockRSExecutor {
197    @Override
198    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
199        AdminProtos.ExecuteProceduresRequest req) throws IOException {
200      if (req.getOpenRegionCount() > 0) {
201        for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) {
202          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) {
203            execOpenRegion(server, openReq);
204          }
205        }
206      }
207      return AdminProtos.ExecuteProceduresResponse.getDefaultInstance();
208    }
209
210    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
211        AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
212      return null;
213    }
214  }
215
216  protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher {
217    private MockRSExecutor mockRsExec;
218
219    public MockRSProcedureDispatcher(final MasterServices master) {
220      super(master);
221    }
222
223    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
224      this.mockRsExec = mockRsExec;
225    }
226
227    @Override
228    protected void remoteDispatch(ServerName serverName,
229        @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
230      submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures));
231    }
232
233    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
234      public MockRemoteCall(final ServerName serverName,
235          @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
236        super(serverName, operations);
237      }
238
239      @Override
240      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
241          final AdminProtos.ExecuteProceduresRequest request) throws IOException {
242        return mockRsExec.sendRequest(serverName, request);
243      }
244    }
245  }
246}