001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.procedure; 020 021import java.io.IOException; 022import java.util.NavigableMap; 023import java.util.Set; 024import java.util.SortedSet; 025import java.util.concurrent.ConcurrentSkipListMap; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.master.MasterServices; 036import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 037import org.apache.hadoop.hbase.master.assignment.MockMasterServices; 038import org.apache.hadoop.hbase.master.replication.RefreshPeerProcedure; 039import org.apache.hadoop.hbase.procedure2.Procedure; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 042import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.junit.After; 046import org.junit.Assert; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.ExpectedException; 053import org.junit.rules.TestName; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 059 060@Category({ MasterTests.class, MediumTests.class }) 061public class TestServerRemoteProcedure { 062 private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); 066 @Rule 067 public TestName name = new TestName(); 068 @Rule 069 public final ExpectedException exception = ExpectedException.none(); 070 protected HBaseTestingUtility util; 071 protected MockRSProcedureDispatcher rsDispatcher; 072 protected MockMasterServices master; 073 protected AssignmentManager am; 074 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 075 new ConcurrentSkipListMap<>(); 076 // Simple executor to run some simple tasks. 077 protected ScheduledExecutorService executor; 078 079 @Before 080 public void setUp() throws Exception { 081 util = new HBaseTestingUtility(); 082 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 083 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 084 master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); 085 rsDispatcher = new MockRSProcedureDispatcher(master); 086 rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); 087 master.start(2, rsDispatcher); 088 am = master.getAssignmentManager(); 089 } 090 091 @After 092 public void tearDown() throws Exception { 093 master.stop("tearDown"); 094 this.executor.shutdownNow(); 095 } 096 097 @Test 098 public void testRemoteProcedureAndCrashBeforeResponse() throws Exception { 099 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 100 ServerRemoteProcedure refreshPeerProcedure = 101 new RefreshPeerProcedure("test", PeerProcedureInterface.PeerOperationType.ADD, worker); 102 Future<byte[]> future = submitProcedure(refreshPeerProcedure); 103 Thread.sleep(2000); 104 master.getServerManager().expireServer(worker); 105 // if remoteCallFailed is called for this procedure, this procedure should be finished. 106 future.get(5000, TimeUnit.MILLISECONDS); 107 Assert.assertTrue(refreshPeerProcedure.isSuccess()); 108 } 109 110 @Test 111 public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { 112 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 113 ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); 114 Future<byte[]> future = submitProcedure(noopServerRemoteProcedure); 115 Thread.sleep(2000); 116 // complete the process and fail the process at the same time 117 ExecutorService threadPool = Executors.newFixedThreadPool(2); 118 threadPool.execute(() -> noopServerRemoteProcedure 119 .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); 120 threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( 121 master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); 122 future.get(2000, TimeUnit.MILLISECONDS); 123 Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); 124 } 125 126 private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 127 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 128 } 129 130 private static class NoopServerRemoteProcedure extends ServerRemoteProcedure 131 implements PeerProcedureInterface { 132 133 public NoopServerRemoteProcedure(ServerName targetServer) { 134 this.targetServer = targetServer; 135 } 136 137 @Override 138 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 139 return; 140 } 141 142 @Override 143 protected boolean abort(MasterProcedureEnv env) { 144 return false; 145 } 146 147 @Override 148 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 149 return; 150 } 151 152 @Override 153 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 154 return; 155 } 156 157 @Override 158 public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, 159 ServerName serverName) { 160 return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]); 161 } 162 163 @Override 164 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 165 complete(env, null); 166 } 167 168 @Override 169 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 170 RemoteProcedureException error) { 171 complete(env, error); 172 } 173 174 @Override 175 public void complete(MasterProcedureEnv env, Throwable error) { 176 this.succ = true; 177 return; 178 } 179 180 @Override 181 public String getPeerId() { 182 return "test"; 183 } 184 185 @Override 186 public PeerOperationType getPeerOperationType() { 187 return PeerOperationType.ADD; 188 } 189 } 190 191 protected interface MockRSExecutor { 192 AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 193 AdminProtos.ExecuteProceduresRequest req) throws IOException; 194 } 195 196 protected static class NoopRSExecutor implements MockRSExecutor { 197 @Override 198 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 199 AdminProtos.ExecuteProceduresRequest req) throws IOException { 200 if (req.getOpenRegionCount() > 0) { 201 for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { 202 for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { 203 execOpenRegion(server, openReq); 204 } 205 } 206 } 207 return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); 208 } 209 210 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 211 AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { 212 return null; 213 } 214 } 215 216 protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { 217 private MockRSExecutor mockRsExec; 218 219 public MockRSProcedureDispatcher(final MasterServices master) { 220 super(master); 221 } 222 223 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 224 this.mockRsExec = mockRsExec; 225 } 226 227 @Override 228 protected void remoteDispatch(ServerName serverName, 229 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 230 submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); 231 } 232 233 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 234 public MockRemoteCall(final ServerName serverName, 235 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 236 super(serverName, operations); 237 } 238 239 @Override 240 protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, 241 final AdminProtos.ExecuteProceduresRequest request) throws IOException { 242 return mockRsExec.sendRequest(serverName, request); 243 } 244 } 245 } 246}