001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 045import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 046import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 048import org.apache.hadoop.hbase.testclassification.MasterTests; 049import org.apache.hadoop.hbase.testclassification.SmallTests; 050import org.apache.hadoop.io.IOUtils; 051import org.junit.After; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 063 064@Category({ MasterTests.class, SmallTests.class }) 065public class TestWALProcedureStore { 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestWALProcedureStore.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 071 072 private static final int PROCEDURE_STORE_SLOTS = 1; 073 074 private WALProcedureStore procStore; 075 076 private HBaseCommonTestingUtility htu; 077 private FileSystem fs; 078 private Path testDir; 079 private Path logDir; 080 081 private void setupConfig(final Configuration conf) { 082 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 083 } 084 085 @Before 086 public void setUp() throws IOException { 087 htu = new HBaseCommonTestingUtility(); 088 testDir = htu.getDataTestDir(); 089 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 090 fs = testDir.getFileSystem(htu.getConfiguration()); 091 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 092 assertTrue(testDir.depth() > 1); 093 094 setupConfig(htu.getConfiguration()); 095 logDir = new Path(testDir, "proc-logs"); 096 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 097 procStore.start(PROCEDURE_STORE_SLOTS); 098 procStore.recoverLease(); 099 procStore.load(new LoadCounter()); 100 } 101 102 @After 103 public void tearDown() throws IOException { 104 procStore.stop(false); 105 fs.delete(logDir, true); 106 } 107 108 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 109 ProcedureTestingUtility.storeRestart(procStore, loader); 110 } 111 112 @Test 113 public void testEmptyRoll() throws Exception { 114 for (int i = 0; i < 10; ++i) { 115 procStore.periodicRollForTesting(); 116 } 117 assertEquals(1, procStore.getActiveLogs().size()); 118 FileStatus[] status = fs.listStatus(logDir); 119 assertEquals(1, status.length); 120 } 121 122 @Test 123 public void testRestartWithoutData() throws Exception { 124 for (int i = 0; i < 10; ++i) { 125 final LoadCounter loader = new LoadCounter(); 126 storeRestart(loader); 127 } 128 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 129 assertEquals(1, procStore.getActiveLogs().size()); 130 FileStatus[] status = fs.listStatus(logDir); 131 assertEquals(1, status.length); 132 } 133 134 /** 135 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 136 */ 137 @Test 138 public void trackersLoadedForAllOldLogs() throws Exception { 139 for (int i = 0; i <= 20; ++i) { 140 procStore.insert(new TestProcedure(i), null); 141 if (i > 0 && (i % 5) == 0) { 142 LoadCounter loader = new LoadCounter(); 143 storeRestart(loader); 144 } 145 } 146 assertEquals(5, procStore.getActiveLogs().size()); 147 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 148 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 149 assertTrue(tracker != null && !tracker.isEmpty()); 150 } 151 } 152 153 @Test 154 public void testWalCleanerSequentialClean() throws Exception { 155 final Procedure<?>[] procs = new Procedure[5]; 156 ArrayList<ProcedureWALFile> logs = null; 157 158 // Insert procedures and roll wal after every insert. 159 for (int i = 0; i < procs.length; i++) { 160 procs[i] = new TestSequentialProcedure(); 161 procStore.insert(procs[i], null); 162 procStore.rollWriterForTesting(); 163 logs = procStore.getActiveLogs(); 164 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 165 } 166 167 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 168 // from logs list. 169 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 170 for (int i = 0; i < deleteOrder.length; i++) { 171 procStore.delete(procs[deleteOrder[i]].getProcId()); 172 procStore.removeInactiveLogsForTesting(); 173 assertFalse(logs.get(deleteOrder[i]).toString(), 174 procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); 175 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 176 } 177 } 178 179 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 180 // they are in the starting of the list. 181 @Test 182 public void testWalCleanerNoHoles() throws Exception { 183 final Procedure<?>[] procs = new Procedure[5]; 184 ArrayList<ProcedureWALFile> logs = null; 185 // Insert procedures and roll wal after every insert. 186 for (int i = 0; i < procs.length; i++) { 187 procs[i] = new TestSequentialProcedure(); 188 procStore.insert(procs[i], null); 189 procStore.rollWriterForTesting(); 190 logs = procStore.getActiveLogs(); 191 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 192 } 193 194 for (int i = 1; i < procs.length; i++) { 195 procStore.delete(procs[i].getProcId()); 196 } 197 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 198 procStore.delete(procs[0].getProcId()); 199 assertEquals(1, procStore.getActiveLogs().size()); 200 } 201 202 @Test 203 public void testWalCleanerUpdates() throws Exception { 204 TestSequentialProcedure p1 = new TestSequentialProcedure(); 205 TestSequentialProcedure p2 = new TestSequentialProcedure(); 206 procStore.insert(p1, null); 207 procStore.insert(p2, null); 208 procStore.rollWriterForTesting(); 209 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 210 procStore.update(p1); 211 procStore.rollWriterForTesting(); 212 procStore.update(p2); 213 procStore.rollWriterForTesting(); 214 procStore.removeInactiveLogsForTesting(); 215 assertFalse(procStore.getActiveLogs().contains(firstLog)); 216 } 217 218 @Test 219 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 220 TestSequentialProcedure p1 = new TestSequentialProcedure(); 221 TestSequentialProcedure p2 = new TestSequentialProcedure(); 222 procStore.insert(p1, null); 223 procStore.insert(p2, null); 224 procStore.rollWriterForTesting(); // generates first log with p1 + p2 225 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 226 procStore.update(p2); 227 procStore.rollWriterForTesting(); // generates second log with p2 228 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 229 procStore.update(p2); 230 procStore.rollWriterForTesting(); // generates third log with p2 231 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 232 assertEquals(4, procStore.getActiveLogs().size()); 233 procStore.update(p1); 234 procStore.rollWriterForTesting(); // generates fourth log with p1 235 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 236 assertEquals(3, procStore.getActiveLogs().size()); 237 assertFalse(procStore.getActiveLogs().contains(log1)); 238 assertFalse(procStore.getActiveLogs().contains(log2)); 239 } 240 241 @Test 242 public void testWalCleanerWithEmptyRolls() throws Exception { 243 final Procedure<?>[] procs = new Procedure[3]; 244 for (int i = 0; i < procs.length; ++i) { 245 procs[i] = new TestSequentialProcedure(); 246 procStore.insert(procs[i], null); 247 } 248 assertEquals(1, procStore.getActiveLogs().size()); 249 procStore.rollWriterForTesting(); 250 assertEquals(2, procStore.getActiveLogs().size()); 251 procStore.rollWriterForTesting(); 252 assertEquals(3, procStore.getActiveLogs().size()); 253 254 for (int i = 0; i < procs.length; ++i) { 255 procStore.update(procs[i]); 256 procStore.rollWriterForTesting(); 257 procStore.rollWriterForTesting(); 258 if (i < (procs.length - 1)) { 259 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 260 } 261 } 262 assertEquals(7, procStore.getActiveLogs().size()); 263 264 for (int i = 0; i < procs.length; ++i) { 265 procStore.delete(procs[i].getProcId()); 266 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 267 } 268 assertEquals(1, procStore.getActiveLogs().size()); 269 } 270 271 @Test 272 public void testEmptyLogLoad() throws Exception { 273 LoadCounter loader = new LoadCounter(); 274 storeRestart(loader); 275 assertEquals(0, loader.getMaxProcId()); 276 assertEquals(0, loader.getLoadedCount()); 277 assertEquals(0, loader.getCorruptedCount()); 278 } 279 280 @Test 281 public void testLoad() throws Exception { 282 Set<Long> procIds = new HashSet<>(); 283 284 // Insert something in the log 285 Procedure<?> proc1 = new TestSequentialProcedure(); 286 procIds.add(proc1.getProcId()); 287 procStore.insert(proc1, null); 288 289 Procedure<?> proc2 = new TestSequentialProcedure(); 290 Procedure<?>[] child2 = new Procedure[2]; 291 child2[0] = new TestSequentialProcedure(); 292 child2[1] = new TestSequentialProcedure(); 293 294 procIds.add(proc2.getProcId()); 295 procIds.add(child2[0].getProcId()); 296 procIds.add(child2[1].getProcId()); 297 procStore.insert(proc2, child2); 298 299 // Verify that everything is there 300 verifyProcIdsOnRestart(procIds); 301 302 // Update and delete something 303 procStore.update(proc1); 304 procStore.update(child2[1]); 305 procStore.delete(child2[1].getProcId()); 306 procIds.remove(child2[1].getProcId()); 307 308 // Verify that everything is there 309 verifyProcIdsOnRestart(procIds); 310 311 // Remove 4 byte from the trailers 312 procStore.stop(false); 313 FileStatus[] logs = fs.listStatus(logDir); 314 assertEquals(3, logs.length); 315 for (int i = 0; i < logs.length; ++i) { 316 corruptLog(logs[i], 4); 317 } 318 verifyProcIdsOnRestart(procIds); 319 } 320 321 @Test 322 public void testNoTrailerDoubleRestart() throws Exception { 323 // log-0001: proc 0, 1 and 2 are inserted 324 Procedure<?> proc0 = new TestSequentialProcedure(); 325 procStore.insert(proc0, null); 326 Procedure<?> proc1 = new TestSequentialProcedure(); 327 procStore.insert(proc1, null); 328 Procedure<?> proc2 = new TestSequentialProcedure(); 329 procStore.insert(proc2, null); 330 procStore.rollWriterForTesting(); 331 332 // log-0002: proc 1 deleted 333 procStore.delete(proc1.getProcId()); 334 procStore.rollWriterForTesting(); 335 336 // log-0003: proc 2 is update 337 procStore.update(proc2); 338 procStore.rollWriterForTesting(); 339 340 // log-0004: proc 2 deleted 341 procStore.delete(proc2.getProcId()); 342 343 // stop the store and remove the trailer 344 procStore.stop(false); 345 FileStatus[] logs = fs.listStatus(logDir); 346 assertEquals(4, logs.length); 347 for (int i = 0; i < logs.length; ++i) { 348 corruptLog(logs[i], 4); 349 } 350 351 // Test Load 1 352 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 353 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 354 LoadCounter loader = new LoadCounter(); 355 storeRestart(loader); 356 assertEquals(1, loader.getLoadedCount()); 357 assertEquals(0, loader.getCorruptedCount()); 358 359 // Test Load 2 360 assertEquals(5, fs.listStatus(logDir).length); 361 loader = new LoadCounter(); 362 storeRestart(loader); 363 assertEquals(1, loader.getLoadedCount()); 364 assertEquals(0, loader.getCorruptedCount()); 365 366 // remove proc-0 367 procStore.delete(proc0.getProcId()); 368 procStore.periodicRollForTesting(); 369 assertEquals(1, fs.listStatus(logDir).length); 370 storeRestart(loader); 371 } 372 373 @Test 374 public void testProcIdHoles() throws Exception { 375 // Insert 376 for (int i = 0; i < 100; i += 2) { 377 procStore.insert(new TestProcedure(i), null); 378 if (i > 0 && (i % 10) == 0) { 379 LoadCounter loader = new LoadCounter(); 380 storeRestart(loader); 381 assertEquals(0, loader.getCorruptedCount()); 382 assertEquals((i / 2) + 1, loader.getLoadedCount()); 383 } 384 } 385 assertEquals(10, procStore.getActiveLogs().size()); 386 387 // Delete 388 for (int i = 0; i < 100; i += 2) { 389 procStore.delete(i); 390 } 391 assertEquals(1, procStore.getActiveLogs().size()); 392 393 LoadCounter loader = new LoadCounter(); 394 storeRestart(loader); 395 assertEquals(0, loader.getLoadedCount()); 396 assertEquals(0, loader.getCorruptedCount()); 397 } 398 399 @Test 400 public void testCorruptedTrailer() throws Exception { 401 // Insert something 402 for (int i = 0; i < 100; ++i) { 403 procStore.insert(new TestSequentialProcedure(), null); 404 } 405 406 // Stop the store 407 procStore.stop(false); 408 409 // Remove 4 byte from the trailer 410 FileStatus[] logs = fs.listStatus(logDir); 411 assertEquals(1, logs.length); 412 corruptLog(logs[0], 4); 413 414 LoadCounter loader = new LoadCounter(); 415 storeRestart(loader); 416 assertEquals(100, loader.getLoadedCount()); 417 assertEquals(0, loader.getCorruptedCount()); 418 } 419 420 private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 421 final int[] updatedProcs, final int[] nonUpdatedProcs) { 422 for (int index : updatedProcs) { 423 long procId = procs[index].getProcId(); 424 assertTrue("Procedure id : " + procId, tracker.isModified(procId)); 425 } 426 for (int index : nonUpdatedProcs) { 427 long procId = procs[index].getProcId(); 428 assertFalse("Procedure id : " + procId, tracker.isModified(procId)); 429 } 430 } 431 432 private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs, 433 final int[] deletedProcs, final int[] nonDeletedProcs) { 434 for (int index : deletedProcs) { 435 long procId = procs[index].getProcId(); 436 assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.YES, 437 tracker.isDeleted(procId)); 438 } 439 for (int index : nonDeletedProcs) { 440 long procId = procs[index].getProcId(); 441 assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.NO, 442 tracker.isDeleted(procId)); 443 } 444 } 445 446 @Test 447 public void testCorruptedTrailersRebuild() throws Exception { 448 final Procedure<?>[] procs = new Procedure[6]; 449 for (int i = 0; i < procs.length; ++i) { 450 procs[i] = new TestSequentialProcedure(); 451 } 452 // Log State (I=insert, U=updated, D=delete) 453 // | log 1 | log 2 | log 3 | 454 // 0 | I, D | | | 455 // 1 | I | | | 456 // 2 | I | D | | 457 // 3 | I | U | | 458 // 4 | | I | D | 459 // 5 | | | I | 460 procStore.insert(procs[0], null); 461 procStore.insert(procs[1], null); 462 procStore.insert(procs[2], null); 463 procStore.insert(procs[3], null); 464 procStore.delete(procs[0].getProcId()); 465 procStore.rollWriterForTesting(); 466 procStore.delete(procs[2].getProcId()); 467 procStore.update(procs[3]); 468 procStore.insert(procs[4], null); 469 procStore.rollWriterForTesting(); 470 procStore.delete(procs[4].getProcId()); 471 procStore.insert(procs[5], null); 472 473 // Stop the store 474 procStore.stop(false); 475 476 // Remove 4 byte from the trailers 477 final FileStatus[] logs = fs.listStatus(logDir); 478 assertEquals(3, logs.length); 479 for (int i = 0; i < logs.length; ++i) { 480 corruptLog(logs[i], 4); 481 } 482 483 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 484 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 485 final LoadCounter loader = new LoadCounter(); 486 storeRestart(loader); 487 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 488 assertEquals(0, loader.getCorruptedCount()); 489 490 // Check the Trackers 491 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 492 LOG.info("WALs " + walFiles); 493 assertEquals(4, walFiles.size()); 494 LOG.info("Checking wal " + walFiles.get(0)); 495 assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 }, 496 new int[] { 4, 5 }); 497 LOG.info("Checking wal " + walFiles.get(1)); 498 assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 }, 499 new int[] { 0, 1, 5 }); 500 LOG.info("Checking wal " + walFiles.get(2)); 501 assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 }, 502 new int[] { 0, 1, 2, 3 }); 503 LOG.info("Checking global tracker "); 504 assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 }); 505 } 506 507 @Test 508 public void testCorruptedEntries() throws Exception { 509 // Insert something 510 for (int i = 0; i < 100; ++i) { 511 procStore.insert(new TestSequentialProcedure(), null); 512 } 513 514 // Stop the store 515 procStore.stop(false); 516 517 // Remove some byte from the log 518 // (enough to cut the trailer and corrupt some entries) 519 FileStatus[] logs = fs.listStatus(logDir); 520 assertEquals(1, logs.length); 521 corruptLog(logs[0], 1823); 522 523 LoadCounter loader = new LoadCounter(); 524 storeRestart(loader); 525 assertTrue(procStore.getCorruptedLogs() != null); 526 assertEquals(1, procStore.getCorruptedLogs().size()); 527 assertEquals(87, loader.getLoadedCount()); 528 assertEquals(0, loader.getCorruptedCount()); 529 } 530 531 @Test 532 public void testCorruptedProcedures() throws Exception { 533 // Insert root-procedures 534 TestProcedure[] rootProcs = new TestProcedure[10]; 535 for (int i = 1; i <= rootProcs.length; i++) { 536 rootProcs[i - 1] = new TestProcedure(i, 0); 537 procStore.insert(rootProcs[i - 1], null); 538 rootProcs[i - 1].addStackId(0); 539 procStore.update(rootProcs[i - 1]); 540 } 541 // insert root-child txn 542 procStore.rollWriterForTesting(); 543 for (int i = 1; i <= rootProcs.length; i++) { 544 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 545 rootProcs[i - 1].addStackId(1); 546 procStore.insert(rootProcs[i - 1], new Procedure[] { b }); 547 } 548 // insert child updates 549 procStore.rollWriterForTesting(); 550 for (int i = 1; i <= rootProcs.length; i++) { 551 procStore.update(new TestProcedure(rootProcs.length + i, i)); 552 } 553 554 // Stop the store 555 procStore.stop(false); 556 557 // the first log was removed, 558 // we have insert-txn and updates in the others so everything is fine 559 FileStatus[] logs = fs.listStatus(logDir); 560 assertEquals(Arrays.toString(logs), 2, logs.length); 561 Arrays.sort(logs, new Comparator<FileStatus>() { 562 @Override 563 public int compare(FileStatus o1, FileStatus o2) { 564 return o1.getPath().getName().compareTo(o2.getPath().getName()); 565 } 566 }); 567 568 LoadCounter loader = new LoadCounter(); 569 storeRestart(loader); 570 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 571 assertEquals(0, loader.getCorruptedCount()); 572 573 // Remove the second log, we have lost all the root/parent references 574 fs.delete(logs[0].getPath(), false); 575 loader.reset(); 576 storeRestart(loader); 577 assertEquals(0, loader.getLoadedCount()); 578 assertEquals(rootProcs.length, loader.getCorruptedCount()); 579 for (Procedure<?> proc : loader.getCorrupted()) { 580 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); 581 assertTrue(proc.toString(), 582 proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); 583 } 584 } 585 586 @Test 587 public void testRollAndRemove() throws IOException { 588 // Insert something in the log 589 Procedure<?> proc1 = new TestSequentialProcedure(); 590 procStore.insert(proc1, null); 591 592 Procedure<?> proc2 = new TestSequentialProcedure(); 593 procStore.insert(proc2, null); 594 595 // roll the log, now we have 2 596 procStore.rollWriterForTesting(); 597 assertEquals(2, procStore.getActiveLogs().size()); 598 599 // everything will be up to date in the second log 600 // so we can remove the first one 601 procStore.update(proc1); 602 procStore.update(proc2); 603 assertEquals(1, procStore.getActiveLogs().size()); 604 605 // roll the log, now we have 2 606 procStore.rollWriterForTesting(); 607 assertEquals(2, procStore.getActiveLogs().size()); 608 609 // remove everything active 610 // so we can remove all the logs 611 procStore.delete(proc1.getProcId()); 612 procStore.delete(proc2.getProcId()); 613 assertEquals(1, procStore.getActiveLogs().size()); 614 } 615 616 @Test 617 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 618 final TestProcedure[] procs = new TestProcedure[3]; 619 for (int i = 0; i < procs.length; ++i) { 620 procs[i] = new TestProcedure(i + 1, 0); 621 procStore.insert(procs[i], null); 622 } 623 procStore.rollWriterForTesting(); 624 for (int i = 0; i < procs.length; ++i) { 625 procStore.update(procs[i]); 626 procStore.rollWriterForTesting(); 627 } 628 procStore.stop(false); 629 630 FileStatus[] status = fs.listStatus(logDir); 631 assertEquals(procs.length + 1, status.length); 632 633 // simulate another active master removing the wals 634 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() { 635 private int count = 0; 636 637 @Override 638 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 639 if (++count <= 2) { 640 fs.delete(path, false); 641 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 642 throw new FileNotFoundException("test file not found " + path); 643 } 644 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 645 } 646 }); 647 648 final LoadCounter loader = new LoadCounter(); 649 procStore.start(PROCEDURE_STORE_SLOTS); 650 procStore.recoverLease(); 651 procStore.load(loader); 652 assertEquals(procs.length, loader.getMaxProcId()); 653 assertEquals(1, loader.getRunnableCount()); 654 assertEquals(0, loader.getCompletedCount()); 655 assertEquals(0, loader.getCorruptedCount()); 656 } 657 658 @Test 659 public void testLogFileAlreadyExists() throws IOException { 660 final boolean[] tested = { false }; 661 WALProcedureStore mStore = Mockito.spy(procStore); 662 663 Answer<Boolean> ans = new Answer<Boolean>() { 664 @Override 665 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 666 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 667 switch ((int) logId) { 668 case 2: 669 // Create a file so that real rollWriter() runs into file exists condition 670 Path logFilePath = mStore.getLogFilePath(logId); 671 mStore.getFileSystem().create(logFilePath); 672 break; 673 case 3: 674 // Success only when we retry with logId 3 675 tested[0] = true; 676 default: 677 break; 678 } 679 return (Boolean) invocationOnMock.callRealMethod(); 680 } 681 }; 682 683 // First time Store has one log file, next id will be 2 684 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 685 // next time its 3 686 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 687 688 mStore.recoverLease(); 689 assertTrue(tested[0]); 690 } 691 692 @Test 693 public void testLoadChildren() throws Exception { 694 TestProcedure a = new TestProcedure(1, 0); 695 TestProcedure b = new TestProcedure(2, 1); 696 TestProcedure c = new TestProcedure(3, 1); 697 698 // INIT 699 procStore.insert(a, null); 700 701 // Run A first step 702 a.addStackId(0); 703 procStore.update(a); 704 705 // Run A second step 706 a.addStackId(1); 707 procStore.insert(a, new Procedure[] { b, c }); 708 709 // Run B first step 710 b.addStackId(2); 711 procStore.update(b); 712 713 // Run C first and last step 714 c.addStackId(3); 715 procStore.update(c); 716 717 // Run B second setp 718 b.addStackId(4); 719 procStore.update(b); 720 721 // back to A 722 a.addStackId(5); 723 a.setSuccessState(); 724 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 725 restartAndAssert(3, 0, 1, 0); 726 } 727 728 @Test 729 public void testBatchDelete() throws Exception { 730 for (int i = 1; i < 10; ++i) { 731 procStore.insert(new TestProcedure(i), null); 732 } 733 734 // delete nothing 735 long[] toDelete = new long[] { 1, 2, 3, 4 }; 736 procStore.delete(toDelete, 2, 0); 737 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 738 for (int i = 1; i < 10; ++i) { 739 assertEquals(true, loader.isRunnable(i)); 740 } 741 742 // delete the full "toDelete" array (2, 4, 6, 8) 743 toDelete = new long[] { 2, 4, 6, 8 }; 744 procStore.delete(toDelete, 0, toDelete.length); 745 loader = restartAndAssert(9, 5, 0, 0); 746 for (int i = 1; i < 10; ++i) { 747 assertEquals(i % 2 != 0, loader.isRunnable(i)); 748 } 749 750 // delete a slice of "toDelete" (1, 3) 751 toDelete = new long[] { 5, 7, 1, 3, 9 }; 752 procStore.delete(toDelete, 2, 2); 753 loader = restartAndAssert(9, 3, 0, 0); 754 for (int i = 1; i < 10; ++i) { 755 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 756 } 757 758 // delete a single item (5) 759 toDelete = new long[] { 5 }; 760 procStore.delete(toDelete, 0, 1); 761 loader = restartAndAssert(9, 2, 0, 0); 762 for (int i = 1; i < 10; ++i) { 763 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 764 } 765 766 // delete remaining using a slice of "toDelete" (7, 9) 767 toDelete = new long[] { 0, 7, 9 }; 768 procStore.delete(toDelete, 1, 2); 769 loader = restartAndAssert(0, 0, 0, 0); 770 for (int i = 1; i < 10; ++i) { 771 assertEquals(false, loader.isRunnable(i)); 772 } 773 } 774 775 @Test 776 public void testBatchInsert() throws Exception { 777 final int count = 10; 778 final TestProcedure[] procs = new TestProcedure[count]; 779 for (int i = 0; i < procs.length; ++i) { 780 procs[i] = new TestProcedure(i + 1); 781 } 782 procStore.insert(procs); 783 restartAndAssert(count, count, 0, 0); 784 785 for (int i = 0; i < procs.length; ++i) { 786 final long procId = procs[i].getProcId(); 787 procStore.delete(procId); 788 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 789 } 790 procStore.removeInactiveLogsForTesting(); 791 assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); 792 } 793 794 @Test 795 public void testWALDirAndWALArchiveDir() throws IOException { 796 Configuration conf = htu.getConfiguration(); 797 procStore = createWALProcedureStore(conf); 798 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 799 } 800 801 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 802 return new WALProcedureStore(conf, new LeaseRecovery() { 803 @Override 804 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 805 // no-op 806 } 807 }); 808 } 809 810 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, 811 int corruptedCount) throws Exception { 812 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount, 813 completedCount, corruptedCount); 814 } 815 816 private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException { 817 assertTrue(logFile.getLen() > dropBytes); 818 LOG.debug( 819 "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes); 820 Path tmpPath = new Path(testDir, "corrupted.log"); 821 InputStream in = fs.open(logFile.getPath()); 822 OutputStream out = fs.create(tmpPath); 823 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 824 if (!fs.rename(tmpPath, logFile.getPath())) { 825 throw new IOException("Unable to rename"); 826 } 827 } 828 829 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 830 LOG.debug("expected: " + procIds); 831 LoadCounter loader = new LoadCounter(); 832 storeRestart(loader); 833 assertEquals(procIds.size(), loader.getLoadedCount()); 834 assertEquals(0, loader.getCorruptedCount()); 835 } 836 837 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 838 private static long seqid = 0; 839 840 public TestSequentialProcedure() { 841 setProcId(++seqid); 842 } 843 844 @Override 845 protected Procedure<Void>[] execute(Void env) { 846 return null; 847 } 848 849 @Override 850 protected void rollback(Void env) { 851 } 852 853 @Override 854 protected boolean abort(Void env) { 855 return false; 856 } 857 858 @Override 859 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 860 long procId = getProcId(); 861 if (procId % 2 == 0) { 862 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 863 serializer.serialize(builder.build()); 864 } 865 } 866 867 @Override 868 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 869 long procId = getProcId(); 870 if (procId % 2 == 0) { 871 Int64Value value = serializer.deserialize(Int64Value.class); 872 assertEquals(procId, value.getValue()); 873 } 874 } 875 } 876}