001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.procedure; 020 021import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.NavigableMap; 026import java.util.Optional; 027import java.util.Set; 028import java.util.SortedSet; 029import java.util.concurrent.ConcurrentSkipListMap; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.master.MasterServices; 043import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 044import org.apache.hadoop.hbase.master.assignment.MockMasterServices; 045import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure; 046import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 047import org.apache.hadoop.hbase.procedure2.Procedure; 048import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 049import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 050import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 051import org.apache.hadoop.hbase.testclassification.MasterTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.After; 055import org.junit.Assert; 056import org.junit.Before; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.ExpectedException; 062import org.junit.rules.TestName; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 069 070@Category({ MasterTests.class, MediumTests.class }) 071public class TestServerRemoteProcedure { 072 private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); 076 @Rule 077 public TestName name = new TestName(); 078 @Rule 079 public final ExpectedException exception = ExpectedException.none(); 080 protected HBaseTestingUtility util; 081 protected MockRSProcedureDispatcher rsDispatcher; 082 protected MockMasterServices master; 083 protected AssignmentManager am; 084 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 085 new ConcurrentSkipListMap<>(); 086 // Simple executor to run some simple tasks. 087 protected ScheduledExecutorService executor; 088 089 @Before 090 public void setUp() throws Exception { 091 util = new HBaseTestingUtility(); 092 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 093 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 094 master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); 095 rsDispatcher = new MockRSProcedureDispatcher(master); 096 rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); 097 master.start(2, rsDispatcher); 098 am = master.getAssignmentManager(); 099 master.getServerManager().getOnlineServersList().stream() 100 .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName)); 101 } 102 103 @After 104 public void tearDown() throws Exception { 105 master.stop("tearDown"); 106 this.executor.shutdownNow(); 107 } 108 109 @Test 110 public void testSplitWALAndCrashBeforeResponse() throws Exception { 111 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 112 ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); 113 ServerRemoteProcedure splitWALRemoteProcedure = 114 new SplitWALRemoteProcedure(worker, crashedWorker, "test"); 115 Future<byte[]> future = submitProcedure(splitWALRemoteProcedure); 116 Thread.sleep(2000); 117 master.getServerManager().expireServer(worker); 118 // if remoteCallFailed is called for this procedure, this procedure should be finished. 119 future.get(5000, TimeUnit.MILLISECONDS); 120 Assert.assertTrue(splitWALRemoteProcedure.isSuccess()); 121 } 122 123 @Test 124 public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { 125 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 126 ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); 127 Future<byte[]> future = submitProcedure(noopServerRemoteProcedure); 128 Thread.sleep(2000); 129 // complete the process and fail the process at the same time 130 ExecutorService threadPool = Executors.newFixedThreadPool(2); 131 threadPool.execute(() -> noopServerRemoteProcedure 132 .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); 133 threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( 134 master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); 135 future.get(2000, TimeUnit.MILLISECONDS); 136 Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); 137 } 138 139 @Test 140 public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception { 141 TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher"); 142 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1)) 143 .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build(); 144 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 145 env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri); 146 TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null); 147 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 148 OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker); 149 Future<byte[]> future = submitProcedure(openRegionProcedure); 150 Thread.sleep(2000); 151 rsDispatcher.removeNode(worker); 152 try { 153 future.get(2000, TimeUnit.MILLISECONDS); 154 fail(); 155 } catch (TimeoutException e) { 156 LOG.info("timeout is expected"); 157 } 158 Assert.assertFalse(openRegionProcedure.isFinished()); 159 } 160 161 private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 162 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 163 } 164 165 private static class NoopServerRemoteProcedure extends ServerRemoteProcedure 166 implements ServerProcedureInterface { 167 168 public NoopServerRemoteProcedure(ServerName targetServer) { 169 this.targetServer = targetServer; 170 } 171 172 @Override 173 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 174 return; 175 } 176 177 @Override 178 protected boolean abort(MasterProcedureEnv env) { 179 return false; 180 } 181 182 @Override 183 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 184 return; 185 } 186 187 @Override 188 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 189 return; 190 } 191 192 @Override 193 public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild( 194 MasterProcedureEnv env, ServerName serverName) { 195 return Optional 196 .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0])); 197 } 198 199 @Override 200 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 201 complete(env, null); 202 } 203 204 @Override 205 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 206 RemoteProcedureException error) { 207 complete(env, error); 208 } 209 210 @Override 211 public void complete(MasterProcedureEnv env, Throwable error) { 212 this.succ = true; 213 return; 214 } 215 216 @Override 217 public ServerName getServerName() { 218 return targetServer; 219 } 220 221 @Override 222 public boolean hasMetaTableRegion() { 223 return false; 224 } 225 226 @Override 227 public ServerOperationType getServerOperationType() { 228 return SWITCH_RPC_THROTTLE; 229 } 230 231 } 232 233 protected interface MockRSExecutor { 234 AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 235 AdminProtos.ExecuteProceduresRequest req) throws IOException; 236 } 237 238 protected static class NoopRSExecutor implements MockRSExecutor { 239 @Override 240 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 241 AdminProtos.ExecuteProceduresRequest req) throws IOException { 242 if (req.getOpenRegionCount() > 0) { 243 for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { 244 for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { 245 execOpenRegion(server, openReq); 246 } 247 } 248 } 249 return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); 250 } 251 252 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 253 AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { 254 return null; 255 } 256 } 257 258 protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { 259 private MockRSExecutor mockRsExec; 260 261 public MockRSProcedureDispatcher(final MasterServices master) { 262 super(master); 263 } 264 265 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 266 this.mockRsExec = mockRsExec; 267 } 268 269 @Override 270 protected void remoteDispatch(ServerName serverName, 271 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 272 submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); 273 } 274 275 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 276 public MockRemoteCall(final ServerName serverName, 277 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 278 super(serverName, operations); 279 } 280 281 @Override 282 protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, 283 final AdminProtos.ExecuteProceduresRequest request) throws IOException { 284 return mockRsExec.sendRequest(serverName, request); 285 } 286 } 287 } 288}