001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Comparator; 032import java.util.HashSet; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.procedure2.Procedure; 042import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 046import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 047import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 049import org.apache.hadoop.hbase.testclassification.MasterTests; 050import org.apache.hadoop.hbase.testclassification.SmallTests; 051import org.apache.hadoop.io.IOUtils; 052import org.junit.jupiter.api.AfterEach; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 063 064@Tag(MasterTests.TAG) 065@Tag(SmallTests.TAG) 066public class TestWALProcedureStore { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 069 070 private static final int PROCEDURE_STORE_SLOTS = 1; 071 072 private WALProcedureStore procStore; 073 074 private final HBaseCommonTestingUtil htu = new HBaseCommonTestingUtil(); 075 private FileSystem fs; 076 private Path testDir; 077 private Path logDir; 078 079 private void setupConfig(final Configuration conf) { 080 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 081 } 082 083 @BeforeEach 084 public void setUp() throws IOException { 085 testDir = htu.getDataTestDir(); 086 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 087 fs = testDir.getFileSystem(htu.getConfiguration()); 088 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 089 assertTrue(testDir.depth() > 1); 090 091 TestSequentialProcedure.seqId.set(0); 092 setupConfig(htu.getConfiguration()); 093 logDir = new Path(testDir, "proc-logs"); 094 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 095 procStore.start(PROCEDURE_STORE_SLOTS); 096 procStore.recoverLease(); 097 procStore.load(new LoadCounter()); 098 } 099 100 @AfterEach 101 public void tearDown() throws IOException { 102 procStore.stop(false); 103 fs.delete(logDir, true); 104 } 105 106 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 107 ProcedureTestingUtility.storeRestart(procStore, loader); 108 } 109 110 @Test 111 public void testEmptyRoll() throws Exception { 112 for (int i = 0; i < 10; ++i) { 113 procStore.periodicRollForTesting(); 114 } 115 assertEquals(1, procStore.getActiveLogs().size()); 116 FileStatus[] status = fs.listStatus(logDir); 117 assertEquals(1, status.length); 118 } 119 120 @Test 121 public void testRestartWithoutData() throws Exception { 122 for (int i = 0; i < 10; ++i) { 123 final LoadCounter loader = new LoadCounter(); 124 storeRestart(loader); 125 } 126 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 127 assertEquals(1, procStore.getActiveLogs().size()); 128 FileStatus[] status = fs.listStatus(logDir); 129 assertEquals(1, status.length); 130 } 131 132 /** 133 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 134 */ 135 @Test 136 public void trackersLoadedForAllOldLogs() throws Exception { 137 for (int i = 0; i <= 20; ++i) { 138 procStore.insert(new TestProcedure(i), null); 139 if (i > 0 && (i % 5) == 0) { 140 LoadCounter loader = new LoadCounter(); 141 storeRestart(loader); 142 } 143 } 144 assertEquals(5, procStore.getActiveLogs().size()); 145 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 146 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 147 assertTrue(tracker != null && !tracker.isEmpty()); 148 } 149 } 150 151 @Test 152 public void testWalCleanerSequentialClean() throws Exception { 153 final Procedure<?>[] procs = new Procedure[5]; 154 ArrayList<ProcedureWALFile> logs = null; 155 156 // Insert procedures and roll wal after every insert. 157 for (int i = 0; i < procs.length; i++) { 158 procs[i] = new TestSequentialProcedure(); 159 procStore.insert(procs[i], null); 160 procStore.rollWriterForTesting(); 161 logs = procStore.getActiveLogs(); 162 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 163 } 164 165 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 166 // from logs list. 167 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 168 for (int i = 0; i < deleteOrder.length; i++) { 169 procStore.delete(procs[deleteOrder[i]].getProcId()); 170 procStore.removeInactiveLogsForTesting(); 171 assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i])), 172 logs.get(deleteOrder[i]).toString()); 173 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 174 } 175 } 176 177 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 178 // they are in the starting of the list. 179 @Test 180 public void testWalCleanerNoHoles() throws Exception { 181 final Procedure<?>[] procs = new Procedure[5]; 182 ArrayList<ProcedureWALFile> logs = null; 183 // Insert procedures and roll wal after every insert. 184 for (int i = 0; i < procs.length; i++) { 185 procs[i] = new TestSequentialProcedure(); 186 procStore.insert(procs[i], null); 187 procStore.rollWriterForTesting(); 188 logs = procStore.getActiveLogs(); 189 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 190 } 191 192 for (int i = 1; i < procs.length; i++) { 193 procStore.delete(procs[i].getProcId()); 194 } 195 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 196 procStore.delete(procs[0].getProcId()); 197 assertEquals(1, procStore.getActiveLogs().size()); 198 } 199 200 @Test 201 public void testWalCleanerUpdates() throws Exception { 202 TestSequentialProcedure p1 = new TestSequentialProcedure(); 203 TestSequentialProcedure p2 = new TestSequentialProcedure(); 204 procStore.insert(p1, null); 205 procStore.insert(p2, null); 206 procStore.rollWriterForTesting(); 207 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 208 procStore.update(p1); 209 procStore.rollWriterForTesting(); 210 procStore.update(p2); 211 procStore.rollWriterForTesting(); 212 procStore.removeInactiveLogsForTesting(); 213 assertFalse(procStore.getActiveLogs().contains(firstLog)); 214 } 215 216 @Test 217 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 218 TestSequentialProcedure p1 = new TestSequentialProcedure(); 219 TestSequentialProcedure p2 = new TestSequentialProcedure(); 220 procStore.insert(p1, null); 221 procStore.insert(p2, null); 222 procStore.rollWriterForTesting(); // generates first log with p1 + p2 223 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 224 procStore.update(p2); 225 procStore.rollWriterForTesting(); // generates second log with p2 226 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 227 procStore.update(p2); 228 procStore.rollWriterForTesting(); // generates third log with p2 229 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 230 assertEquals(4, procStore.getActiveLogs().size()); 231 procStore.update(p1); 232 procStore.rollWriterForTesting(); // generates fourth log with p1 233 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 234 assertEquals(3, procStore.getActiveLogs().size()); 235 assertFalse(procStore.getActiveLogs().contains(log1)); 236 assertFalse(procStore.getActiveLogs().contains(log2)); 237 } 238 239 @Test 240 public void testWalCleanerWithEmptyRolls() throws Exception { 241 final Procedure<?>[] procs = new Procedure[3]; 242 for (int i = 0; i < procs.length; ++i) { 243 procs[i] = new TestSequentialProcedure(); 244 procStore.insert(procs[i], null); 245 } 246 assertEquals(1, procStore.getActiveLogs().size()); 247 procStore.rollWriterForTesting(); 248 assertEquals(2, procStore.getActiveLogs().size()); 249 procStore.rollWriterForTesting(); 250 assertEquals(3, procStore.getActiveLogs().size()); 251 252 for (int i = 0; i < procs.length; ++i) { 253 procStore.update(procs[i]); 254 procStore.rollWriterForTesting(); 255 procStore.rollWriterForTesting(); 256 if (i < (procs.length - 1)) { 257 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 258 } 259 } 260 assertEquals(7, procStore.getActiveLogs().size()); 261 262 for (int i = 0; i < procs.length; ++i) { 263 procStore.delete(procs[i].getProcId()); 264 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 265 } 266 assertEquals(1, procStore.getActiveLogs().size()); 267 } 268 269 @Test 270 public void testEmptyLogLoad() throws Exception { 271 LoadCounter loader = new LoadCounter(); 272 storeRestart(loader); 273 assertEquals(0, loader.getMaxProcId()); 274 assertEquals(0, loader.getLoadedCount()); 275 assertEquals(0, loader.getCorruptedCount()); 276 } 277 278 @Test 279 public void testLoad() throws Exception { 280 Set<Long> procIds = new HashSet<>(); 281 282 // Insert something in the log 283 Procedure<?> proc1 = new TestSequentialProcedure(); 284 procIds.add(proc1.getProcId()); 285 procStore.insert(proc1, null); 286 287 Procedure<?> proc2 = new TestSequentialProcedure(); 288 Procedure<?>[] child2 = new Procedure[2]; 289 child2[0] = new TestSequentialProcedure(); 290 child2[1] = new TestSequentialProcedure(); 291 292 procIds.add(proc2.getProcId()); 293 procIds.add(child2[0].getProcId()); 294 procIds.add(child2[1].getProcId()); 295 procStore.insert(proc2, child2); 296 297 // Verify that everything is there 298 verifyProcIdsOnRestart(procIds); 299 300 // Update and delete something 301 procStore.update(proc1); 302 procStore.update(child2[1]); 303 procStore.delete(child2[1].getProcId()); 304 procIds.remove(child2[1].getProcId()); 305 306 // Verify that everything is there 307 verifyProcIdsOnRestart(procIds); 308 309 // Remove 4 byte from the trailers 310 procStore.stop(false); 311 FileStatus[] logs = fs.listStatus(logDir); 312 assertEquals(3, logs.length); 313 for (int i = 0; i < logs.length; ++i) { 314 corruptLog(logs[i], 4); 315 } 316 verifyProcIdsOnRestart(procIds); 317 } 318 319 @Test 320 public void testNoTrailerDoubleRestart() throws Exception { 321 // log-0001: proc 0, 1 and 2 are inserted 322 Procedure<?> proc0 = new TestSequentialProcedure(); 323 procStore.insert(proc0, null); 324 Procedure<?> proc1 = new TestSequentialProcedure(); 325 procStore.insert(proc1, null); 326 Procedure<?> proc2 = new TestSequentialProcedure(); 327 procStore.insert(proc2, null); 328 procStore.rollWriterForTesting(); 329 330 // log-0002: proc 1 deleted 331 procStore.delete(proc1.getProcId()); 332 procStore.rollWriterForTesting(); 333 334 // log-0003: proc 2 is update 335 procStore.update(proc2); 336 procStore.rollWriterForTesting(); 337 338 // log-0004: proc 2 deleted 339 procStore.delete(proc2.getProcId()); 340 341 // stop the store and remove the trailer 342 procStore.stop(false); 343 FileStatus[] logs = fs.listStatus(logDir); 344 assertEquals(4, logs.length); 345 for (int i = 0; i < logs.length; ++i) { 346 corruptLog(logs[i], 4); 347 } 348 349 // Test Load 1 350 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 351 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 352 LoadCounter loader = new LoadCounter(); 353 storeRestart(loader); 354 assertEquals(1, loader.getLoadedCount()); 355 assertEquals(0, loader.getCorruptedCount()); 356 357 // Test Load 2 358 assertEquals(5, fs.listStatus(logDir).length); 359 loader = new LoadCounter(); 360 storeRestart(loader); 361 assertEquals(1, loader.getLoadedCount()); 362 assertEquals(0, loader.getCorruptedCount()); 363 364 // remove proc-0 365 procStore.delete(proc0.getProcId()); 366 procStore.periodicRollForTesting(); 367 assertEquals(1, fs.listStatus(logDir).length); 368 storeRestart(loader); 369 } 370 371 @Test 372 public void testProcIdHoles() throws Exception { 373 // Insert 374 for (int i = 0; i < 100; i += 2) { 375 procStore.insert(new TestProcedure(i), null); 376 if (i > 0 && (i % 10) == 0) { 377 LoadCounter loader = new LoadCounter(); 378 storeRestart(loader); 379 assertEquals(0, loader.getCorruptedCount()); 380 assertEquals((i / 2) + 1, loader.getLoadedCount()); 381 } 382 } 383 assertEquals(10, procStore.getActiveLogs().size()); 384 385 // Delete 386 for (int i = 0; i < 100; i += 2) { 387 procStore.delete(i); 388 } 389 assertEquals(1, procStore.getActiveLogs().size()); 390 391 LoadCounter loader = new LoadCounter(); 392 storeRestart(loader); 393 assertEquals(0, loader.getLoadedCount()); 394 assertEquals(0, loader.getCorruptedCount()); 395 } 396 397 @Test 398 public void testCorruptedTrailer() throws Exception { 399 // Insert something 400 for (int i = 0; i < 100; ++i) { 401 procStore.insert(new TestSequentialProcedure(), null); 402 } 403 404 // Stop the store 405 procStore.stop(false); 406 407 // Remove 4 byte from the trailer 408 FileStatus[] logs = fs.listStatus(logDir); 409 assertEquals(1, logs.length); 410 corruptLog(logs[0], 4); 411 412 LoadCounter loader = new LoadCounter(); 413 storeRestart(loader); 414 assertEquals(100, loader.getLoadedCount()); 415 assertEquals(0, loader.getCorruptedCount()); 416 } 417 418 private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 419 final int[] updatedProcs, final int[] nonUpdatedProcs) { 420 for (int index : updatedProcs) { 421 long procId = procs[index].getProcId(); 422 assertTrue(tracker.isModified(procId), "Procedure id : " + procId); 423 } 424 for (int index : nonUpdatedProcs) { 425 long procId = procs[index].getProcId(); 426 assertFalse(tracker.isModified(procId), "Procedure id : " + procId); 427 } 428 } 429 430 private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 431 final int[] deletedProcs, final int[] nonDeletedProcs) { 432 for (int index : deletedProcs) { 433 long procId = procs[index].getProcId(); 434 assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId), 435 "Procedure id : " + procId); 436 } 437 for (int index : nonDeletedProcs) { 438 long procId = procs[index].getProcId(); 439 assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId), 440 "Procedure id : " + procId); 441 } 442 } 443 444 @Test 445 public void testCorruptedTrailersRebuild() throws Exception { 446 final Procedure<?>[] procs = new Procedure[6]; 447 for (int i = 0; i < procs.length; ++i) { 448 procs[i] = new TestSequentialProcedure(); 449 } 450 // Log State (I=insert, U=updated, D=delete) 451 // | log 1 | log 2 | log 3 | 452 // 0 | I, D | | | 453 // 1 | I | | | 454 // 2 | I | D | | 455 // 3 | I | U | | 456 // 4 | | I | D | 457 // 5 | | | I | 458 procStore.insert(procs[0], null); 459 procStore.insert(procs[1], null); 460 procStore.insert(procs[2], null); 461 procStore.insert(procs[3], null); 462 procStore.delete(procs[0].getProcId()); 463 procStore.rollWriterForTesting(); 464 procStore.delete(procs[2].getProcId()); 465 procStore.update(procs[3]); 466 procStore.insert(procs[4], null); 467 procStore.rollWriterForTesting(); 468 procStore.delete(procs[4].getProcId()); 469 procStore.insert(procs[5], null); 470 471 // Stop the store 472 procStore.stop(false); 473 474 // Remove 4 byte from the trailers 475 final FileStatus[] logs = fs.listStatus(logDir); 476 assertEquals(3, logs.length); 477 for (int i = 0; i < logs.length; ++i) { 478 corruptLog(logs[i], 4); 479 } 480 481 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 482 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 483 final LoadCounter loader = new LoadCounter(); 484 storeRestart(loader); 485 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 486 assertEquals(0, loader.getCorruptedCount()); 487 488 // Check the Trackers 489 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 490 LOG.info("WALs " + walFiles); 491 assertEquals(4, walFiles.size()); 492 LOG.info("Checking wal " + walFiles.get(0)); 493 assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 }, 494 new int[] { 4, 5 }); 495 LOG.info("Checking wal " + walFiles.get(1)); 496 assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 }, 497 new int[] { 0, 1, 5 }); 498 LOG.info("Checking wal " + walFiles.get(2)); 499 assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 }, 500 new int[] { 0, 1, 2, 3 }); 501 LOG.info("Checking global tracker "); 502 assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 }); 503 } 504 505 @Test 506 public void testCorruptedEntries() throws Exception { 507 // Insert something 508 for (int i = 0; i < 100; ++i) { 509 procStore.insert(new TestSequentialProcedure(), null); 510 } 511 512 // Stop the store 513 procStore.stop(false); 514 515 // Remove some byte from the log 516 // (enough to cut the trailer and corrupt some entries) 517 FileStatus[] logs = fs.listStatus(logDir); 518 assertEquals(1, logs.length); 519 corruptLog(logs[0], 1823); 520 521 LoadCounter loader = new LoadCounter(); 522 storeRestart(loader); 523 assertNotNull(procStore.getCorruptedLogs()); 524 assertEquals(1, procStore.getCorruptedLogs().size()); 525 assertEquals(87, loader.getLoadedCount()); 526 assertEquals(0, loader.getCorruptedCount()); 527 } 528 529 @Test 530 public void testCorruptedProcedures() throws Exception { 531 // Insert root-procedures 532 TestProcedure[] rootProcs = new TestProcedure[10]; 533 for (int i = 1; i <= rootProcs.length; i++) { 534 rootProcs[i - 1] = new TestProcedure(i, 0); 535 procStore.insert(rootProcs[i - 1], null); 536 rootProcs[i - 1].addStackId(0); 537 procStore.update(rootProcs[i - 1]); 538 } 539 // insert root-child txn 540 procStore.rollWriterForTesting(); 541 for (int i = 1; i <= rootProcs.length; i++) { 542 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 543 rootProcs[i - 1].addStackId(1); 544 procStore.insert(rootProcs[i - 1], new Procedure[] { b }); 545 } 546 // insert child updates 547 procStore.rollWriterForTesting(); 548 for (int i = 1; i <= rootProcs.length; i++) { 549 procStore.update(new TestProcedure(rootProcs.length + i, i)); 550 } 551 552 // Stop the store 553 procStore.stop(false); 554 555 // the first log was removed, 556 // we have insert-txn and updates in the others so everything is fine 557 FileStatus[] logs = fs.listStatus(logDir); 558 assertEquals(2, logs.length, Arrays.toString(logs)); 559 Arrays.sort(logs, new Comparator<FileStatus>() { 560 @Override 561 public int compare(FileStatus o1, FileStatus o2) { 562 return o1.getPath().getName().compareTo(o2.getPath().getName()); 563 } 564 }); 565 566 LoadCounter loader = new LoadCounter(); 567 storeRestart(loader); 568 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 569 assertEquals(0, loader.getCorruptedCount()); 570 571 // Remove the second log, we have lost all the root/parent references 572 fs.delete(logs[0].getPath(), false); 573 loader.reset(); 574 storeRestart(loader); 575 assertEquals(0, loader.getLoadedCount()); 576 assertEquals(rootProcs.length, loader.getCorruptedCount()); 577 for (Procedure<?> proc : loader.getCorrupted()) { 578 assertTrue(proc.getParentProcId() <= rootProcs.length, proc.toString()); 579 assertTrue(proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2), 580 proc.toString()); 581 } 582 } 583 584 @Test 585 public void testRollAndRemove() throws IOException { 586 // Insert something in the log 587 Procedure<?> proc1 = new TestSequentialProcedure(); 588 procStore.insert(proc1, null); 589 590 Procedure<?> proc2 = new TestSequentialProcedure(); 591 procStore.insert(proc2, null); 592 593 // roll the log, now we have 2 594 procStore.rollWriterForTesting(); 595 assertEquals(2, procStore.getActiveLogs().size()); 596 597 // everything will be up to date in the second log 598 // so we can remove the first one 599 procStore.update(proc1); 600 procStore.update(proc2); 601 assertEquals(1, procStore.getActiveLogs().size()); 602 603 // roll the log, now we have 2 604 procStore.rollWriterForTesting(); 605 assertEquals(2, procStore.getActiveLogs().size()); 606 607 // remove everything active 608 // so we can remove all the logs 609 procStore.delete(proc1.getProcId()); 610 procStore.delete(proc2.getProcId()); 611 assertEquals(1, procStore.getActiveLogs().size()); 612 } 613 614 @Test 615 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 616 final TestProcedure[] procs = new TestProcedure[3]; 617 for (int i = 0; i < procs.length; ++i) { 618 procs[i] = new TestProcedure(i + 1, 0); 619 procStore.insert(procs[i], null); 620 } 621 procStore.rollWriterForTesting(); 622 for (int i = 0; i < procs.length; ++i) { 623 procStore.update(procs[i]); 624 procStore.rollWriterForTesting(); 625 } 626 procStore.stop(false); 627 628 FileStatus[] status = fs.listStatus(logDir); 629 assertEquals(procs.length + 1, status.length); 630 631 // simulate another active master removing the wals 632 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() { 633 private int count = 0; 634 635 @Override 636 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 637 if (++count <= 2) { 638 fs.delete(path, false); 639 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 640 throw new FileNotFoundException("test file not found " + path); 641 } 642 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 643 } 644 }); 645 646 final LoadCounter loader = new LoadCounter(); 647 procStore.start(PROCEDURE_STORE_SLOTS); 648 procStore.recoverLease(); 649 procStore.load(loader); 650 assertEquals(procs.length, loader.getMaxProcId()); 651 assertEquals(1, loader.getRunnableCount()); 652 assertEquals(0, loader.getCompletedCount()); 653 assertEquals(0, loader.getCorruptedCount()); 654 } 655 656 @Test 657 public void testLogFileAlreadyExists() throws IOException { 658 final boolean[] tested = { false }; 659 WALProcedureStore mStore = Mockito.spy(procStore); 660 661 Answer<Boolean> ans = new Answer<Boolean>() { 662 @Override 663 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 664 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 665 switch ((int) logId) { 666 case 2: 667 // Create a file so that real rollWriter() runs into file exists condition 668 Path logFilePath = mStore.getLogFilePath(logId); 669 mStore.getFileSystem().create(logFilePath); 670 break; 671 case 3: 672 // Success only when we retry with logId 3 673 tested[0] = true; 674 default: 675 break; 676 } 677 return (Boolean) invocationOnMock.callRealMethod(); 678 } 679 }; 680 681 // First time Store has one log file, next id will be 2 682 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 683 // next time its 3 684 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 685 686 mStore.recoverLease(); 687 assertTrue(tested[0]); 688 } 689 690 @Test 691 public void testLoadChildren() throws Exception { 692 TestProcedure a = new TestProcedure(1, 0); 693 TestProcedure b = new TestProcedure(2, 1); 694 TestProcedure c = new TestProcedure(3, 1); 695 696 // INIT 697 procStore.insert(a, null); 698 699 // Run A first step 700 a.addStackId(0); 701 procStore.update(a); 702 703 // Run A second step 704 a.addStackId(1); 705 procStore.insert(a, new Procedure[] { b, c }); 706 707 // Run B first step 708 b.addStackId(2); 709 procStore.update(b); 710 711 // Run C first and last step 712 c.addStackId(3); 713 procStore.update(c); 714 715 // Run B second setp 716 b.addStackId(4); 717 procStore.update(b); 718 719 // back to A 720 a.addStackId(5); 721 a.setSuccessState(); 722 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 723 restartAndAssert(3, 0, 1, 0); 724 } 725 726 @Test 727 public void testBatchDelete() throws Exception { 728 for (int i = 1; i < 10; ++i) { 729 procStore.insert(new TestProcedure(i), null); 730 } 731 732 // delete nothing 733 long[] toDelete = new long[] { 1, 2, 3, 4 }; 734 procStore.delete(toDelete, 2, 0); 735 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 736 for (int i = 1; i < 10; ++i) { 737 assertEquals(true, loader.isRunnable(i)); 738 } 739 740 // delete the full "toDelete" array (2, 4, 6, 8) 741 toDelete = new long[] { 2, 4, 6, 8 }; 742 procStore.delete(toDelete, 0, toDelete.length); 743 loader = restartAndAssert(9, 5, 0, 0); 744 for (int i = 1; i < 10; ++i) { 745 assertEquals(i % 2 != 0, loader.isRunnable(i)); 746 } 747 748 // delete a slice of "toDelete" (1, 3) 749 toDelete = new long[] { 5, 7, 1, 3, 9 }; 750 procStore.delete(toDelete, 2, 2); 751 loader = restartAndAssert(9, 3, 0, 0); 752 for (int i = 1; i < 10; ++i) { 753 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 754 } 755 756 // delete a single item (5) 757 toDelete = new long[] { 5 }; 758 procStore.delete(toDelete, 0, 1); 759 loader = restartAndAssert(9, 2, 0, 0); 760 for (int i = 1; i < 10; ++i) { 761 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 762 } 763 764 // delete remaining using a slice of "toDelete" (7, 9) 765 toDelete = new long[] { 0, 7, 9 }; 766 procStore.delete(toDelete, 1, 2); 767 loader = restartAndAssert(0, 0, 0, 0); 768 for (int i = 1; i < 10; ++i) { 769 assertEquals(false, loader.isRunnable(i)); 770 } 771 } 772 773 @Test 774 public void testBatchInsert() throws Exception { 775 final int count = 10; 776 final TestProcedure[] procs = new TestProcedure[count]; 777 for (int i = 0; i < procs.length; ++i) { 778 procs[i] = new TestProcedure(i + 1); 779 } 780 procStore.insert(procs); 781 restartAndAssert(count, count, 0, 0); 782 783 for (int i = 0; i < procs.length; ++i) { 784 final long procId = procs[i].getProcId(); 785 procStore.delete(procId); 786 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 787 } 788 procStore.removeInactiveLogsForTesting(); 789 assertEquals(1, procStore.getActiveLogs().size(), "WALs=" + procStore.getActiveLogs()); 790 } 791 792 @Test 793 public void testWALDirAndWALArchiveDir() throws IOException { 794 Configuration conf = htu.getConfiguration(); 795 procStore = createWALProcedureStore(conf); 796 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 797 } 798 799 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 800 return new WALProcedureStore(conf, new LeaseRecovery() { 801 @Override 802 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 803 // no-op 804 } 805 }); 806 } 807 808 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, 809 int corruptedCount) throws Exception { 810 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount, 811 completedCount, corruptedCount); 812 } 813 814 private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { 815 assertTrue(logFile.getLen() > dropBytes); 816 LOG.debug( 817 "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes); 818 Path tmpPath = new Path(testDir, "corrupted.log"); 819 InputStream in = fs.open(logFile.getPath()); 820 OutputStream out = fs.create(tmpPath); 821 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 822 if (!fs.rename(tmpPath, logFile.getPath())) { 823 throw new IOException("Unable to rename"); 824 } 825 } 826 827 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 828 LOG.debug("expected: " + procIds); 829 LoadCounter loader = new LoadCounter(); 830 storeRestart(loader); 831 assertEquals(procIds.size(), loader.getLoadedCount()); 832 assertEquals(0, loader.getCorruptedCount()); 833 } 834 835 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 836 837 private static final AtomicLong seqId = new AtomicLong(0); 838 839 public TestSequentialProcedure() { 840 setProcId(seqId.incrementAndGet()); 841 } 842 843 @Override 844 protected Procedure<Void>[] execute(Void env) { 845 return null; 846 } 847 848 @Override 849 protected void rollback(Void env) { 850 } 851 852 @Override 853 protected boolean abort(Void env) { 854 return false; 855 } 856 857 @Override 858 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 859 long procId = getProcId(); 860 if (procId % 2 == 0) { 861 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 862 serializer.serialize(builder.build()); 863 } 864 } 865 866 @Override 867 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 868 long procId = getProcId(); 869 if (procId % 2 == 0) { 870 Int64Value value = serializer.deserialize(Int64Value.class); 871 assertEquals(procId, value.getValue()); 872 } 873 } 874 } 875}