001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertNotEquals; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.net.SocketTimeoutException; 025import java.util.List; 026import java.util.NavigableMap; 027import java.util.Random; 028import java.util.Set; 029import java.util.SortedSet; 030import java.util.concurrent.ConcurrentSkipListMap; 031import java.util.concurrent.ConcurrentSkipListSet; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.concurrent.ScheduledExecutorService; 036import java.util.concurrent.TimeUnit; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionInfoBuilder; 044import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 045import org.apache.hadoop.hbase.master.MasterServices; 046import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 048import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 049import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 050import org.apache.hadoop.hbase.procedure2.Procedure; 051import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 052import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 053import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.FSUtils; 057import org.apache.hadoop.ipc.RemoteException; 058import org.junit.After; 059import org.junit.Before; 060import org.junit.Rule; 061import org.junit.rules.ExpectedException; 062import org.junit.rules.TestName; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 078 079/** 080 * Base class for AM test 081 */ 082public class TestAssignmentManagerBase { 083 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class); 084 085 @Rule public TestName name = new TestName(); 086 @Rule public final ExpectedException exception = ExpectedException.none(); 087 088 private static final int PROC_NTHREADS = 64; 089 protected static final int NREGIONS = 1 * 1000; 090 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 091 092 protected HBaseTestingUtility UTIL; 093 protected MockRSProcedureDispatcher rsDispatcher; 094 protected MockMasterServices master; 095 protected AssignmentManager am; 096 protected NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers = 097 new ConcurrentSkipListMap<>(); 098 // Simple executor to run some simple tasks. 099 protected ScheduledExecutorService executor; 100 101 protected ProcedureMetrics assignProcMetrics; 102 protected ProcedureMetrics unassignProcMetrics; 103 104 protected long assignSubmittedCount = 0; 105 protected long assignFailedCount = 0; 106 protected long unassignSubmittedCount = 0; 107 protected long unassignFailedCount = 0; 108 109 protected void setupConfiguration(Configuration conf) throws Exception { 110 FSUtils.setRootDir(conf, UTIL.getDataTestDir()); 111 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 112 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 113 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 114 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 115 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. 116 } 117 118 @Before 119 public void setUp() throws Exception { 120 UTIL = new HBaseTestingUtility(); 121 this.executor = Executors.newSingleThreadScheduledExecutor(); 122 setupConfiguration(UTIL.getConfiguration()); 123 master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers); 124 rsDispatcher = new MockRSProcedureDispatcher(master); 125 master.start(NSERVERS, rsDispatcher); 126 am = master.getAssignmentManager(); 127 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 128 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 129 setUpMeta(); 130 } 131 132 private void setUpMeta() throws Exception { 133 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 134 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 135 am.wakeMetaLoadedEvent(); 136 } 137 138 @After 139 public void tearDown() throws Exception { 140 master.stop("tearDown"); 141 this.executor.shutdownNow(); 142 } 143 144 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 145 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 146 } 147 148 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 149 try { 150 return future.get(5, TimeUnit.SECONDS); 151 } catch (ExecutionException e) { 152 LOG.info("ExecutionException", e); 153 Exception ee = (Exception)e.getCause(); 154 if (ee instanceof InterruptedIOException) { 155 for (Procedure<?> p: this.master.getMasterProcedureExecutor().getProcedures()) { 156 LOG.info(p.toStringDetails()); 157 } 158 } 159 throw (Exception)e.getCause(); 160 } 161 } 162 163 // ============================================================================================ 164 // Helpers 165 // ============================================================================================ 166 protected void bulkSubmit(final AssignProcedure[] procs) throws Exception { 167 final Thread[] threads = new Thread[PROC_NTHREADS]; 168 for (int i = 0; i < threads.length; ++i) { 169 final int threadId = i; 170 threads[i] = new Thread() { 171 @Override 172 public void run() { 173 TableName tableName = TableName.valueOf("table-" + threadId); 174 int n = (procs.length / threads.length); 175 int start = threadId * n; 176 int stop = start + n; 177 for (int j = start; j < stop; ++j) { 178 procs[j] = createAndSubmitAssign(tableName, j); 179 } 180 } 181 }; 182 threads[i].start(); 183 } 184 for (int i = 0; i < threads.length; ++i) { 185 threads[i].join(); 186 } 187 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 188 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 189 } 190 } 191 192 private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { 193 RegionInfo hri = createRegionInfo(tableName, regionId); 194 AssignProcedure proc = am.createAssignProcedure(hri); 195 master.getMasterProcedureExecutor().submitProcedure(proc); 196 return proc; 197 } 198 199 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 200 return RegionInfoBuilder.newBuilder(tableName) 201 .setStartKey(Bytes.toBytes(regionId)) 202 .setEndKey(Bytes.toBytes(regionId + 1)) 203 .setSplit(false) 204 .setRegionId(0) 205 .build(); 206 } 207 208 private void sendTransitionReport(final ServerName serverName, 209 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 210 final TransitionCode state) throws IOException { 211 ReportRegionStateTransitionRequest.Builder req = 212 ReportRegionStateTransitionRequest.newBuilder(); 213 req.setServer(ProtobufUtil.toServerName(serverName)); 214 req.addTransition(RegionStateTransition.newBuilder() 215 .addRegionInfo(regionInfo) 216 .setTransitionCode(state) 217 .setOpenSeqNum(1) 218 .build()); 219 am.reportRegionStateTransition(req.build()); 220 } 221 222 private void doCrash(final ServerName serverName) { 223 this.am.submitServerCrash(serverName, false/*No WALs here*/); 224 } 225 226 private void doRestart(final ServerName serverName) { 227 try { 228 this.master.restartRegionServer(serverName); 229 } catch (IOException e) { 230 LOG.warn("Can not restart RS with new startcode"); 231 } 232 } 233 234 protected class NoopRsExecutor implements MockRSExecutor { 235 @Override 236 public ExecuteProceduresResponse sendRequest(ServerName server, 237 ExecuteProceduresRequest request) throws IOException { 238 ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); 239 if (request.getOpenRegionCount() > 0) { 240 for (OpenRegionRequest req: request.getOpenRegionList()) { 241 OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder(); 242 for (RegionOpenInfo openReq: req.getOpenInfoList()) { 243 RegionOpeningState state = execOpenRegion(server, openReq); 244 if (state != null) { 245 resp.addOpeningState(state); 246 } 247 } 248 builder.addOpenRegion(resp.build()); 249 } 250 } 251 if (request.getCloseRegionCount() > 0) { 252 for (CloseRegionRequest req: request.getCloseRegionList()) { 253 CloseRegionResponse resp = execCloseRegion(server, 254 req.getRegion().getValue().toByteArray()); 255 if (resp != null) { 256 builder.addCloseRegion(resp); 257 } 258 } 259 } 260 return ExecuteProceduresResponse.newBuilder().build(); 261 } 262 263 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) 264 throws IOException { 265 return null; 266 } 267 268 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 269 throws IOException { 270 return null; 271 } 272 } 273 274 protected class GoodRsExecutor extends NoopRsExecutor { 275 @Override 276 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) 277 throws IOException { 278 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); 279 // Concurrency? 280 // Now update the state of our cluster in regionsToRegionServers. 281 SortedSet<byte []> regions = regionsToRegionServers.get(server); 282 if (regions == null) { 283 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 284 regionsToRegionServers.put(server, regions); 285 } 286 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 287 if (regions.contains(hri.getRegionName())) { 288 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 289 } 290 regions.add(hri.getRegionName()); 291 return RegionOpeningState.OPENED; 292 } 293 294 @Override 295 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 296 throws IOException { 297 RegionInfo hri = am.getRegionInfo(regionName); 298 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); 299 return CloseRegionResponse.newBuilder().setClosed(true).build(); 300 } 301 } 302 303 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 304 @Override 305 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 306 throws IOException { 307 throw new ServerNotRunningYetException("wait on server startup"); 308 } 309 } 310 311 protected static class FaultyRsExecutor implements MockRSExecutor { 312 private final IOException exception; 313 314 public FaultyRsExecutor(final IOException exception) { 315 this.exception = exception; 316 } 317 318 @Override 319 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 320 throws IOException { 321 throw exception; 322 } 323 } 324 325 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 326 private final int maxSocketTimeoutRetries; 327 private final int maxServerRetries; 328 329 private ServerName lastServer; 330 private int sockTimeoutRetries; 331 private int serverRetries; 332 333 public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { 334 this.maxServerRetries = maxServerRetries; 335 this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; 336 } 337 338 @Override 339 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 340 throws IOException { 341 // SocketTimeoutException should be a temporary problem 342 // unless the server will be declared dead. 343 if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { 344 if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); 345 lastServer = server; 346 LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); 347 throw new SocketTimeoutException("simulate socket timeout"); 348 } else if (serverRetries++ < maxServerRetries) { 349 LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); 350 master.getServerManager().moveFromOnlineToDeadServers(server); 351 sockTimeoutRetries = 0; 352 throw new SocketTimeoutException("simulate socket timeout"); 353 } else { 354 return super.sendRequest(server, req); 355 } 356 } 357 } 358 359 /** 360 * Takes open request and then returns nothing so acts like a RS that went zombie. 361 * No response (so proc is stuck/suspended on the Master and won't wake up.). We 362 * then send in a crash for this server after a few seconds; crash is supposed to 363 * take care of the suspended procedures. 364 */ 365 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 366 private int invocations; 367 368 @Override 369 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 370 throws IOException { 371 if (this.invocations++ > 0) { 372 // Return w/o problem the second time through here. 373 return super.execOpenRegion(server, openReq); 374 } 375 // The procedure on master will just hang forever because nothing comes back 376 // from the RS in this case. 377 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 378 executor.schedule(new Runnable() { 379 @Override 380 public void run() { 381 LOG.info("Sending in CRASH of " + server); 382 doCrash(server); 383 } 384 }, 1, TimeUnit.SECONDS); 385 return null; 386 } 387 } 388 389 /** 390 * Takes open request and then returns nothing so acts like a RS that went zombie. 391 * No response (so proc is stuck/suspended on the Master and won't wake up.). 392 * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create 393 * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor 394 * will restart RS directly, situation for RS crashed when SCP is not enabled. 395 */ 396 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 397 private int invocations; 398 399 @Override 400 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 401 throws IOException { 402 if (this.invocations++ > 0) { 403 // Return w/o problem the second time through here. 404 return super.execOpenRegion(server, openReq); 405 } 406 // The procedure on master will just hang forever because nothing comes back 407 // from the RS in this case. 408 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 409 executor.schedule(new Runnable() { 410 @Override 411 public void run() { 412 LOG.info("Restarting RS of " + server); 413 doRestart(server); 414 } 415 }, 1, TimeUnit.SECONDS); 416 return null; 417 } 418 } 419 420 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 421 public static final int TYPES_OF_FAILURE = 6; 422 private int invocations; 423 424 @Override 425 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 426 throws IOException { 427 switch (this.invocations++) { 428 case 0: throw new NotServingRegionException("Fake"); 429 case 1: 430 executor.schedule(new Runnable() { 431 @Override 432 public void run() { 433 LOG.info("Sending in CRASH of " + server); 434 doCrash(server); 435 } 436 }, 1, TimeUnit.SECONDS); 437 throw new RegionServerAbortedException("Fake!"); 438 case 2: 439 executor.schedule(new Runnable() { 440 @Override 441 public void run() { 442 LOG.info("Sending in CRASH of " + server); 443 doCrash(server); 444 } 445 }, 1, TimeUnit.SECONDS); 446 throw new RegionServerStoppedException("Fake!"); 447 case 3: throw new ServerNotRunningYetException("Fake!"); 448 case 4: 449 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 450 executor.schedule(new Runnable() { 451 @Override 452 public void run() { 453 LOG.info("Sending in CRASH of " + server); 454 doCrash(server); 455 } 456 }, 1, TimeUnit.SECONDS); 457 return null; 458 default: 459 return super.execCloseRegion(server, regionName); 460 } 461 } 462 } 463 464 protected class RandRsExecutor extends NoopRsExecutor { 465 private final Random rand = new Random(); 466 467 @Override 468 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 469 throws IOException { 470 switch (rand.nextInt(5)) { 471 case 0: throw new ServerNotRunningYetException("wait on server startup"); 472 case 1: throw new SocketTimeoutException("simulate socket timeout"); 473 case 2: throw new RemoteException("java.io.IOException", "unexpected exception"); 474 } 475 return super.sendRequest(server, req); 476 } 477 478 @Override 479 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 480 throws IOException { 481 switch (rand.nextInt(6)) { 482 case 0: 483 LOG.info("Return OPENED response"); 484 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); 485 return OpenRegionResponse.RegionOpeningState.OPENED; 486 case 1: 487 LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); 488 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); 489 return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; 490 case 2: 491 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 492 sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN); 493 return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 494 } 495 // The procedure on master will just hang forever because nothing comes back 496 // from the RS in this case. 497 LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds..."); 498 executor.schedule(new Runnable() { 499 @Override 500 public void run() { 501 LOG.info("Delayed CRASHING of " + server); 502 doCrash(server); 503 } 504 }, 5, TimeUnit.SECONDS); 505 return null; 506 } 507 508 @Override 509 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 510 throws IOException { 511 CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); 512 boolean closed = rand.nextBoolean(); 513 if (closed) { 514 RegionInfo hri = am.getRegionInfo(regionName); 515 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED); 516 } 517 resp.setClosed(closed); 518 return resp.build(); 519 } 520 } 521 522 protected interface MockRSExecutor { 523 ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 524 throws IOException; 525 } 526 527 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 528 private MockRSExecutor mockRsExec; 529 530 public MockRSProcedureDispatcher(final MasterServices master) { 531 super(master); 532 } 533 534 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 535 this.mockRsExec = mockRsExec; 536 } 537 538 @Override 539 protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 540 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 541 } 542 543 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 544 public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) { 545 super(serverName, operations); 546 } 547 548 @Override 549 public void dispatchOpenRequests(MasterProcedureEnv env, 550 List<RegionOpenOperation> operations) { 551 request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); 552 } 553 554 @Override 555 public void dispatchCloseRequests(MasterProcedureEnv env, 556 List<RegionCloseOperation> operations) { 557 for (RegionCloseOperation op : operations) { 558 request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); 559 } 560 } 561 562 @Override 563 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 564 final ExecuteProceduresRequest request) throws IOException { 565 return mockRsExec.sendRequest(serverName, request); 566 } 567 } 568 } 569 570 protected void collectAssignmentManagerMetrics() { 571 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 572 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 573 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 574 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 575 } 576}