001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.NavigableMap; 025import java.util.Optional; 026import java.util.Set; 027import java.util.SortedSet; 028import java.util.concurrent.ConcurrentSkipListMap; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 043import org.apache.hadoop.hbase.master.assignment.MockMasterServices; 044import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure; 045import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 046import org.apache.hadoop.hbase.procedure2.Procedure; 047import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 048import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 049import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 050import org.apache.hadoop.hbase.testclassification.MasterTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.After; 054import org.junit.Assert; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.ExpectedException; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 068 069@Category({ MasterTests.class, MediumTests.class }) 070public class TestServerRemoteProcedure { 071 private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); 075 @Rule 076 public TestName name = new TestName(); 077 @Rule 078 public final ExpectedException exception = ExpectedException.none(); 079 protected HBaseTestingUtility util; 080 protected MockRSProcedureDispatcher rsDispatcher; 081 protected MockMasterServices master; 082 protected AssignmentManager am; 083 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 084 new ConcurrentSkipListMap<>(); 085 // Simple executor to run some simple tasks. 086 protected ScheduledExecutorService executor; 087 088 @Before 089 public void setUp() throws Exception { 090 util = new HBaseTestingUtility(); 091 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 092 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 093 master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); 094 rsDispatcher = new MockRSProcedureDispatcher(master); 095 rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); 096 master.start(2, rsDispatcher); 097 am = master.getAssignmentManager(); 098 master.getServerManager().getOnlineServersList().stream() 099 .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName)); 100 } 101 102 @After 103 public void tearDown() throws Exception { 104 master.stop("tearDown"); 105 this.executor.shutdownNow(); 106 } 107 108 @Test 109 public void testSplitWALAndCrashBeforeResponse() throws Exception { 110 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 111 ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); 112 ServerRemoteProcedure splitWALRemoteProcedure = 113 new SplitWALRemoteProcedure(worker, crashedWorker, "test"); 114 Future<byte[]> future = submitProcedure(splitWALRemoteProcedure); 115 Thread.sleep(2000); 116 master.getServerManager().expireServer(worker); 117 // if remoteCallFailed is called for this procedure, this procedure should be finished. 118 future.get(5000, TimeUnit.MILLISECONDS); 119 Assert.assertTrue(splitWALRemoteProcedure.isSuccess()); 120 } 121 122 @Test 123 public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { 124 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 125 ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); 126 Future<byte[]> future = submitProcedure(noopServerRemoteProcedure); 127 Thread.sleep(2000); 128 // complete the process and fail the process at the same time 129 ExecutorService threadPool = Executors.newFixedThreadPool(2); 130 threadPool.execute(() -> noopServerRemoteProcedure 131 .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); 132 threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( 133 master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); 134 future.get(2000, TimeUnit.MILLISECONDS); 135 Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); 136 } 137 138 @Test 139 public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception { 140 TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher"); 141 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1)) 142 .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build(); 143 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 144 env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri); 145 TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null); 146 ServerName worker = master.getServerManager().getOnlineServersList().get(0); 147 OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker); 148 Future<byte[]> future = submitProcedure(openRegionProcedure); 149 Thread.sleep(2000); 150 rsDispatcher.removeNode(worker); 151 try { 152 future.get(2000, TimeUnit.MILLISECONDS); 153 fail(); 154 } catch (TimeoutException e) { 155 LOG.info("timeout is expected"); 156 } 157 Assert.assertFalse(openRegionProcedure.isFinished()); 158 } 159 160 private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 161 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 162 } 163 164 private static class NoopServerRemoteProcedure extends ServerRemoteProcedure 165 implements ServerProcedureInterface { 166 167 public NoopServerRemoteProcedure(ServerName targetServer) { 168 this.targetServer = targetServer; 169 } 170 171 @Override 172 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 173 return; 174 } 175 176 @Override 177 protected boolean abort(MasterProcedureEnv env) { 178 return false; 179 } 180 181 @Override 182 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 183 return; 184 } 185 186 @Override 187 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 188 return; 189 } 190 191 @Override 192 public Optional<RemoteProcedureDispatcher.RemoteOperation> 193 remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { 194 return Optional 195 .of(new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0])); 196 } 197 198 @Override 199 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 200 complete(env, null); 201 } 202 203 @Override 204 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 205 RemoteProcedureException error) { 206 complete(env, error); 207 } 208 209 @Override 210 public void complete(MasterProcedureEnv env, Throwable error) { 211 this.succ = true; 212 return; 213 } 214 215 @Override 216 public ServerName getServerName() { 217 return targetServer; 218 } 219 220 @Override 221 public boolean hasMetaTableRegion() { 222 return false; 223 } 224 225 @Override 226 public ServerOperationType getServerOperationType() { 227 return SWITCH_RPC_THROTTLE; 228 } 229 230 } 231 232 protected interface MockRSExecutor { 233 AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 234 AdminProtos.ExecuteProceduresRequest req) throws IOException; 235 } 236 237 protected static class NoopRSExecutor implements MockRSExecutor { 238 @Override 239 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 240 AdminProtos.ExecuteProceduresRequest req) throws IOException { 241 if (req.getOpenRegionCount() > 0) { 242 for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { 243 for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { 244 execOpenRegion(server, openReq); 245 } 246 } 247 } 248 return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); 249 } 250 251 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 252 AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { 253 return null; 254 } 255 } 256 257 protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { 258 private MockRSExecutor mockRsExec; 259 260 public MockRSProcedureDispatcher(final MasterServices master) { 261 super(master); 262 } 263 264 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 265 this.mockRsExec = mockRsExec; 266 } 267 268 @Override 269 protected void remoteDispatch(ServerName serverName, 270 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 271 submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); 272 } 273 274 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 275 public MockRemoteCall(final ServerName serverName, 276 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 277 super(serverName, operations); 278 } 279 280 @Override 281 protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, 282 final AdminProtos.ExecuteProceduresRequest request) throws IOException { 283 return mockRsExec.sendRequest(serverName, request); 284 } 285 } 286 } 287}