001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.procedure2.Procedure; 041import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 045import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 046import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 048import org.apache.hadoop.hbase.testclassification.MasterTests; 049import org.apache.hadoop.hbase.testclassification.SmallTests; 050import org.apache.hadoop.io.IOUtils; 051import org.junit.After; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 063 064@Category({MasterTests.class, SmallTests.class}) 065public class TestWALProcedureStore { 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestWALProcedureStore.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 071 072 private static final int PROCEDURE_STORE_SLOTS = 1; 073 074 private WALProcedureStore procStore; 075 076 private HBaseCommonTestingUtility htu; 077 private FileSystem fs; 078 private Path testDir; 079 private Path logDir; 080 081 private void setupConfig(final Configuration conf) { 082 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 083 } 084 085 @Before 086 public void setUp() throws IOException { 087 htu = new HBaseCommonTestingUtility(); 088 testDir = htu.getDataTestDir(); 089 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 090 fs = testDir.getFileSystem(htu.getConfiguration()); 091 htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); 092 assertTrue(testDir.depth() > 1); 093 094 setupConfig(htu.getConfiguration()); 095 logDir = new Path(testDir, "proc-logs"); 096 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 097 procStore.start(PROCEDURE_STORE_SLOTS); 098 procStore.recoverLease(); 099 procStore.load(new LoadCounter()); 100 } 101 102 @After 103 public void tearDown() throws IOException { 104 procStore.stop(false); 105 fs.delete(logDir, true); 106 } 107 108 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 109 ProcedureTestingUtility.storeRestart(procStore, loader); 110 } 111 112 @Test 113 public void testEmptyRoll() throws Exception { 114 for (int i = 0; i < 10; ++i) { 115 procStore.periodicRollForTesting(); 116 } 117 assertEquals(1, procStore.getActiveLogs().size()); 118 FileStatus[] status = fs.listStatus(logDir); 119 assertEquals(1, status.length); 120 } 121 122 @Test 123 public void testRestartWithoutData() throws Exception { 124 for (int i = 0; i < 10; ++i) { 125 final LoadCounter loader = new LoadCounter(); 126 storeRestart(loader); 127 } 128 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 129 assertEquals(1, procStore.getActiveLogs().size()); 130 FileStatus[] status = fs.listStatus(logDir); 131 assertEquals(1, status.length); 132 } 133 134 /** 135 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 136 */ 137 @Test 138 public void trackersLoadedForAllOldLogs() throws Exception { 139 for (int i = 0; i <= 20; ++i) { 140 procStore.insert(new TestProcedure(i), null); 141 if (i > 0 && (i % 5) == 0) { 142 LoadCounter loader = new LoadCounter(); 143 storeRestart(loader); 144 } 145 } 146 assertEquals(5, procStore.getActiveLogs().size()); 147 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 148 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 149 assertTrue(tracker != null && !tracker.isEmpty()); 150 } 151 } 152 153 @Test 154 public void testWalCleanerSequentialClean() throws Exception { 155 final Procedure<?>[] procs = new Procedure[5]; 156 ArrayList<ProcedureWALFile> logs = null; 157 158 // Insert procedures and roll wal after every insert. 159 for (int i = 0; i < procs.length; i++) { 160 procs[i] = new TestSequentialProcedure(); 161 procStore.insert(procs[i], null); 162 procStore.rollWriterForTesting(); 163 logs = procStore.getActiveLogs(); 164 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 165 } 166 167 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 168 // from logs list. 169 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 170 for (int i = 0; i < deleteOrder.length; i++) { 171 procStore.delete(procs[deleteOrder[i]].getProcId()); 172 procStore.removeInactiveLogsForTesting(); 173 assertFalse(logs.get(deleteOrder[i]).toString(), 174 procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); 175 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 176 } 177 } 178 179 180 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 181 // they are in the starting of the list. 182 @Test 183 public void testWalCleanerNoHoles() throws Exception { 184 final Procedure<?>[] procs = new Procedure[5]; 185 ArrayList<ProcedureWALFile> logs = null; 186 // Insert procedures and roll wal after every insert. 187 for (int i = 0; i < procs.length; i++) { 188 procs[i] = new TestSequentialProcedure(); 189 procStore.insert(procs[i], null); 190 procStore.rollWriterForTesting(); 191 logs = procStore.getActiveLogs(); 192 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 193 } 194 195 for (int i = 1; i < procs.length; i++) { 196 procStore.delete(procs[i].getProcId()); 197 } 198 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 199 procStore.delete(procs[0].getProcId()); 200 assertEquals(1, procStore.getActiveLogs().size()); 201 } 202 203 @Test 204 public void testWalCleanerUpdates() throws Exception { 205 TestSequentialProcedure p1 = new TestSequentialProcedure(); 206 TestSequentialProcedure p2 = new TestSequentialProcedure(); 207 procStore.insert(p1, null); 208 procStore.insert(p2, null); 209 procStore.rollWriterForTesting(); 210 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 211 procStore.update(p1); 212 procStore.rollWriterForTesting(); 213 procStore.update(p2); 214 procStore.rollWriterForTesting(); 215 procStore.removeInactiveLogsForTesting(); 216 assertFalse(procStore.getActiveLogs().contains(firstLog)); 217 } 218 219 @Test 220 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 221 TestSequentialProcedure p1 = new TestSequentialProcedure(); 222 TestSequentialProcedure p2 = new TestSequentialProcedure(); 223 procStore.insert(p1, null); 224 procStore.insert(p2, null); 225 procStore.rollWriterForTesting(); // generates first log with p1 + p2 226 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 227 procStore.update(p2); 228 procStore.rollWriterForTesting(); // generates second log with p2 229 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 230 procStore.update(p2); 231 procStore.rollWriterForTesting(); // generates third log with p2 232 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 233 assertEquals(4, procStore.getActiveLogs().size()); 234 procStore.update(p1); 235 procStore.rollWriterForTesting(); // generates fourth log with p1 236 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 237 assertEquals(3, procStore.getActiveLogs().size()); 238 assertFalse(procStore.getActiveLogs().contains(log1)); 239 assertFalse(procStore.getActiveLogs().contains(log2)); 240 } 241 242 @Test 243 public void testWalCleanerWithEmptyRolls() throws Exception { 244 final Procedure<?>[] procs = new Procedure[3]; 245 for (int i = 0; i < procs.length; ++i) { 246 procs[i] = new TestSequentialProcedure(); 247 procStore.insert(procs[i], null); 248 } 249 assertEquals(1, procStore.getActiveLogs().size()); 250 procStore.rollWriterForTesting(); 251 assertEquals(2, procStore.getActiveLogs().size()); 252 procStore.rollWriterForTesting(); 253 assertEquals(3, procStore.getActiveLogs().size()); 254 255 for (int i = 0; i < procs.length; ++i) { 256 procStore.update(procs[i]); 257 procStore.rollWriterForTesting(); 258 procStore.rollWriterForTesting(); 259 if (i < (procs.length - 1)) { 260 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 261 } 262 } 263 assertEquals(7, procStore.getActiveLogs().size()); 264 265 for (int i = 0; i < procs.length; ++i) { 266 procStore.delete(procs[i].getProcId()); 267 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 268 } 269 assertEquals(1, procStore.getActiveLogs().size()); 270 } 271 272 @Test 273 public void testEmptyLogLoad() throws Exception { 274 LoadCounter loader = new LoadCounter(); 275 storeRestart(loader); 276 assertEquals(0, loader.getMaxProcId()); 277 assertEquals(0, loader.getLoadedCount()); 278 assertEquals(0, loader.getCorruptedCount()); 279 } 280 281 @Test 282 public void testLoad() throws Exception { 283 Set<Long> procIds = new HashSet<>(); 284 285 // Insert something in the log 286 Procedure<?> proc1 = new TestSequentialProcedure(); 287 procIds.add(proc1.getProcId()); 288 procStore.insert(proc1, null); 289 290 Procedure<?> proc2 = new TestSequentialProcedure(); 291 Procedure<?>[] child2 = new Procedure[2]; 292 child2[0] = new TestSequentialProcedure(); 293 child2[1] = new TestSequentialProcedure(); 294 295 procIds.add(proc2.getProcId()); 296 procIds.add(child2[0].getProcId()); 297 procIds.add(child2[1].getProcId()); 298 procStore.insert(proc2, child2); 299 300 // Verify that everything is there 301 verifyProcIdsOnRestart(procIds); 302 303 // Update and delete something 304 procStore.update(proc1); 305 procStore.update(child2[1]); 306 procStore.delete(child2[1].getProcId()); 307 procIds.remove(child2[1].getProcId()); 308 309 // Verify that everything is there 310 verifyProcIdsOnRestart(procIds); 311 312 // Remove 4 byte from the trailers 313 procStore.stop(false); 314 FileStatus[] logs = fs.listStatus(logDir); 315 assertEquals(3, logs.length); 316 for (int i = 0; i < logs.length; ++i) { 317 corruptLog(logs[i], 4); 318 } 319 verifyProcIdsOnRestart(procIds); 320 } 321 322 @Test 323 public void testNoTrailerDoubleRestart() throws Exception { 324 // log-0001: proc 0, 1 and 2 are inserted 325 Procedure<?> proc0 = new TestSequentialProcedure(); 326 procStore.insert(proc0, null); 327 Procedure<?> proc1 = new TestSequentialProcedure(); 328 procStore.insert(proc1, null); 329 Procedure<?> proc2 = new TestSequentialProcedure(); 330 procStore.insert(proc2, null); 331 procStore.rollWriterForTesting(); 332 333 // log-0002: proc 1 deleted 334 procStore.delete(proc1.getProcId()); 335 procStore.rollWriterForTesting(); 336 337 // log-0003: proc 2 is update 338 procStore.update(proc2); 339 procStore.rollWriterForTesting(); 340 341 // log-0004: proc 2 deleted 342 procStore.delete(proc2.getProcId()); 343 344 // stop the store and remove the trailer 345 procStore.stop(false); 346 FileStatus[] logs = fs.listStatus(logDir); 347 assertEquals(4, logs.length); 348 for (int i = 0; i < logs.length; ++i) { 349 corruptLog(logs[i], 4); 350 } 351 352 // Test Load 1 353 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 354 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 355 LoadCounter loader = new LoadCounter(); 356 storeRestart(loader); 357 assertEquals(1, loader.getLoadedCount()); 358 assertEquals(0, loader.getCorruptedCount()); 359 360 // Test Load 2 361 assertEquals(5, fs.listStatus(logDir).length); 362 loader = new LoadCounter(); 363 storeRestart(loader); 364 assertEquals(1, loader.getLoadedCount()); 365 assertEquals(0, loader.getCorruptedCount()); 366 367 // remove proc-0 368 procStore.delete(proc0.getProcId()); 369 procStore.periodicRollForTesting(); 370 assertEquals(1, fs.listStatus(logDir).length); 371 storeRestart(loader); 372 } 373 374 @Test 375 public void testProcIdHoles() throws Exception { 376 // Insert 377 for (int i = 0; i < 100; i += 2) { 378 procStore.insert(new TestProcedure(i), null); 379 if (i > 0 && (i % 10) == 0) { 380 LoadCounter loader = new LoadCounter(); 381 storeRestart(loader); 382 assertEquals(0, loader.getCorruptedCount()); 383 assertEquals((i / 2) + 1, loader.getLoadedCount()); 384 } 385 } 386 assertEquals(10, procStore.getActiveLogs().size()); 387 388 // Delete 389 for (int i = 0; i < 100; i += 2) { 390 procStore.delete(i); 391 } 392 assertEquals(1, procStore.getActiveLogs().size()); 393 394 LoadCounter loader = new LoadCounter(); 395 storeRestart(loader); 396 assertEquals(0, loader.getLoadedCount()); 397 assertEquals(0, loader.getCorruptedCount()); 398 } 399 400 @Test 401 public void testCorruptedTrailer() throws Exception { 402 // Insert something 403 for (int i = 0; i < 100; ++i) { 404 procStore.insert(new TestSequentialProcedure(), null); 405 } 406 407 // Stop the store 408 procStore.stop(false); 409 410 // Remove 4 byte from the trailer 411 FileStatus[] logs = fs.listStatus(logDir); 412 assertEquals(1, logs.length); 413 corruptLog(logs[0], 4); 414 415 LoadCounter loader = new LoadCounter(); 416 storeRestart(loader); 417 assertEquals(100, loader.getLoadedCount()); 418 assertEquals(0, loader.getCorruptedCount()); 419 } 420 421 private static void assertUpdated(final ProcedureStoreTracker tracker, 422 final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { 423 for (int index : updatedProcs) { 424 long procId = procs[index].getProcId(); 425 assertTrue("Procedure id : " + procId, tracker.isModified(procId)); 426 } 427 for (int index : nonUpdatedProcs) { 428 long procId = procs[index].getProcId(); 429 assertFalse("Procedure id : " + procId, tracker.isModified(procId)); 430 } 431 } 432 433 private static void assertDeleted(final ProcedureStoreTracker tracker, 434 final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { 435 for (int index : deletedProcs) { 436 long procId = procs[index].getProcId(); 437 assertEquals("Procedure id : " + procId, 438 ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId)); 439 } 440 for (int index : nonDeletedProcs) { 441 long procId = procs[index].getProcId(); 442 assertEquals("Procedure id : " + procId, 443 ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId)); 444 } 445 } 446 447 @Test 448 public void testCorruptedTrailersRebuild() throws Exception { 449 final Procedure<?>[] procs = new Procedure[6]; 450 for (int i = 0; i < procs.length; ++i) { 451 procs[i] = new TestSequentialProcedure(); 452 } 453 // Log State (I=insert, U=updated, D=delete) 454 // | log 1 | log 2 | log 3 | 455 // 0 | I, D | | | 456 // 1 | I | | | 457 // 2 | I | D | | 458 // 3 | I | U | | 459 // 4 | | I | D | 460 // 5 | | | I | 461 procStore.insert(procs[0], null); 462 procStore.insert(procs[1], null); 463 procStore.insert(procs[2], null); 464 procStore.insert(procs[3], null); 465 procStore.delete(procs[0].getProcId()); 466 procStore.rollWriterForTesting(); 467 procStore.delete(procs[2].getProcId()); 468 procStore.update(procs[3]); 469 procStore.insert(procs[4], null); 470 procStore.rollWriterForTesting(); 471 procStore.delete(procs[4].getProcId()); 472 procStore.insert(procs[5], null); 473 474 // Stop the store 475 procStore.stop(false); 476 477 // Remove 4 byte from the trailers 478 final FileStatus[] logs = fs.listStatus(logDir); 479 assertEquals(3, logs.length); 480 for (int i = 0; i < logs.length; ++i) { 481 corruptLog(logs[i], 4); 482 } 483 484 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 485 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 486 final LoadCounter loader = new LoadCounter(); 487 storeRestart(loader); 488 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 489 assertEquals(0, loader.getCorruptedCount()); 490 491 // Check the Trackers 492 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 493 LOG.info("WALs " + walFiles); 494 assertEquals(4, walFiles.size()); 495 LOG.info("Checking wal " + walFiles.get(0)); 496 assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5}); 497 LOG.info("Checking wal " + walFiles.get(1)); 498 assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5}); 499 LOG.info("Checking wal " + walFiles.get(2)); 500 assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3}); 501 LOG.info("Checking global tracker "); 502 assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5}); 503 } 504 505 @Test 506 public void testCorruptedEntries() throws Exception { 507 // Insert something 508 for (int i = 0; i < 100; ++i) { 509 procStore.insert(new TestSequentialProcedure(), null); 510 } 511 512 // Stop the store 513 procStore.stop(false); 514 515 // Remove some byte from the log 516 // (enough to cut the trailer and corrupt some entries) 517 FileStatus[] logs = fs.listStatus(logDir); 518 assertEquals(1, logs.length); 519 corruptLog(logs[0], 1823); 520 521 LoadCounter loader = new LoadCounter(); 522 storeRestart(loader); 523 assertTrue(procStore.getCorruptedLogs() != null); 524 assertEquals(1, procStore.getCorruptedLogs().size()); 525 assertEquals(87, loader.getLoadedCount()); 526 assertEquals(0, loader.getCorruptedCount()); 527 } 528 529 @Test 530 public void testCorruptedProcedures() throws Exception { 531 // Insert root-procedures 532 TestProcedure[] rootProcs = new TestProcedure[10]; 533 for (int i = 1; i <= rootProcs.length; i++) { 534 rootProcs[i-1] = new TestProcedure(i, 0); 535 procStore.insert(rootProcs[i-1], null); 536 rootProcs[i-1].addStackId(0); 537 procStore.update(rootProcs[i-1]); 538 } 539 // insert root-child txn 540 procStore.rollWriterForTesting(); 541 for (int i = 1; i <= rootProcs.length; i++) { 542 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 543 rootProcs[i-1].addStackId(1); 544 procStore.insert(rootProcs[i-1], new Procedure[] { b }); 545 } 546 // insert child updates 547 procStore.rollWriterForTesting(); 548 for (int i = 1; i <= rootProcs.length; i++) { 549 procStore.update(new TestProcedure(rootProcs.length + i, i)); 550 } 551 552 // Stop the store 553 procStore.stop(false); 554 555 // the first log was removed, 556 // we have insert-txn and updates in the others so everything is fine 557 FileStatus[] logs = fs.listStatus(logDir); 558 assertEquals(Arrays.toString(logs), 2, logs.length); 559 Arrays.sort(logs, new Comparator<FileStatus>() { 560 @Override 561 public int compare(FileStatus o1, FileStatus o2) { 562 return o1.getPath().getName().compareTo(o2.getPath().getName()); 563 } 564 }); 565 566 LoadCounter loader = new LoadCounter(); 567 storeRestart(loader); 568 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 569 assertEquals(0, loader.getCorruptedCount()); 570 571 // Remove the second log, we have lost all the root/parent references 572 fs.delete(logs[0].getPath(), false); 573 loader.reset(); 574 storeRestart(loader); 575 assertEquals(0, loader.getLoadedCount()); 576 assertEquals(rootProcs.length, loader.getCorruptedCount()); 577 for (Procedure<?> proc : loader.getCorrupted()) { 578 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); 579 assertTrue(proc.toString(), 580 proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); 581 } 582 } 583 584 @Test 585 public void testRollAndRemove() throws IOException { 586 // Insert something in the log 587 Procedure<?> proc1 = new TestSequentialProcedure(); 588 procStore.insert(proc1, null); 589 590 Procedure<?> proc2 = new TestSequentialProcedure(); 591 procStore.insert(proc2, null); 592 593 // roll the log, now we have 2 594 procStore.rollWriterForTesting(); 595 assertEquals(2, procStore.getActiveLogs().size()); 596 597 // everything will be up to date in the second log 598 // so we can remove the first one 599 procStore.update(proc1); 600 procStore.update(proc2); 601 assertEquals(1, procStore.getActiveLogs().size()); 602 603 // roll the log, now we have 2 604 procStore.rollWriterForTesting(); 605 assertEquals(2, procStore.getActiveLogs().size()); 606 607 // remove everything active 608 // so we can remove all the logs 609 procStore.delete(proc1.getProcId()); 610 procStore.delete(proc2.getProcId()); 611 assertEquals(1, procStore.getActiveLogs().size()); 612 } 613 614 @Test 615 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 616 final TestProcedure[] procs = new TestProcedure[3]; 617 for (int i = 0; i < procs.length; ++i) { 618 procs[i] = new TestProcedure(i + 1, 0); 619 procStore.insert(procs[i], null); 620 } 621 procStore.rollWriterForTesting(); 622 for (int i = 0; i < procs.length; ++i) { 623 procStore.update(procs[i]); 624 procStore.rollWriterForTesting(); 625 } 626 procStore.stop(false); 627 628 FileStatus[] status = fs.listStatus(logDir); 629 assertEquals(procs.length + 1, status.length); 630 631 // simulate another active master removing the wals 632 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, 633 new LeaseRecovery() { 634 private int count = 0; 635 636 @Override 637 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 638 if (++count <= 2) { 639 fs.delete(path, false); 640 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 641 throw new FileNotFoundException("test file not found " + path); 642 } 643 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 644 } 645 }); 646 647 final LoadCounter loader = new LoadCounter(); 648 procStore.start(PROCEDURE_STORE_SLOTS); 649 procStore.recoverLease(); 650 procStore.load(loader); 651 assertEquals(procs.length, loader.getMaxProcId()); 652 assertEquals(1, loader.getRunnableCount()); 653 assertEquals(0, loader.getCompletedCount()); 654 assertEquals(0, loader.getCorruptedCount()); 655 } 656 657 @Test 658 public void testLogFileAlreadyExists() throws IOException { 659 final boolean[] tested = {false}; 660 WALProcedureStore mStore = Mockito.spy(procStore); 661 662 Answer<Boolean> ans = new Answer<Boolean>() { 663 @Override 664 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 665 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 666 switch ((int) logId) { 667 case 2: 668 // Create a file so that real rollWriter() runs into file exists condition 669 Path logFilePath = mStore.getLogFilePath(logId); 670 mStore.getFileSystem().create(logFilePath); 671 break; 672 case 3: 673 // Success only when we retry with logId 3 674 tested[0] = true; 675 default: 676 break; 677 } 678 return (Boolean) invocationOnMock.callRealMethod(); 679 } 680 }; 681 682 // First time Store has one log file, next id will be 2 683 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 684 // next time its 3 685 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 686 687 mStore.recoverLease(); 688 assertTrue(tested[0]); 689 } 690 691 @Test 692 public void testLoadChildren() throws Exception { 693 TestProcedure a = new TestProcedure(1, 0); 694 TestProcedure b = new TestProcedure(2, 1); 695 TestProcedure c = new TestProcedure(3, 1); 696 697 // INIT 698 procStore.insert(a, null); 699 700 // Run A first step 701 a.addStackId(0); 702 procStore.update(a); 703 704 // Run A second step 705 a.addStackId(1); 706 procStore.insert(a, new Procedure[] { b, c }); 707 708 // Run B first step 709 b.addStackId(2); 710 procStore.update(b); 711 712 // Run C first and last step 713 c.addStackId(3); 714 procStore.update(c); 715 716 // Run B second setp 717 b.addStackId(4); 718 procStore.update(b); 719 720 // back to A 721 a.addStackId(5); 722 a.setSuccessState(); 723 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 724 restartAndAssert(3, 0, 1, 0); 725 } 726 727 @Test 728 public void testBatchDelete() throws Exception { 729 for (int i = 1; i < 10; ++i) { 730 procStore.insert(new TestProcedure(i), null); 731 } 732 733 // delete nothing 734 long[] toDelete = new long[] { 1, 2, 3, 4 }; 735 procStore.delete(toDelete, 2, 0); 736 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 737 for (int i = 1; i < 10; ++i) { 738 assertEquals(true, loader.isRunnable(i)); 739 } 740 741 // delete the full "toDelete" array (2, 4, 6, 8) 742 toDelete = new long[] { 2, 4, 6, 8 }; 743 procStore.delete(toDelete, 0, toDelete.length); 744 loader = restartAndAssert(9, 5, 0, 0); 745 for (int i = 1; i < 10; ++i) { 746 assertEquals(i % 2 != 0, loader.isRunnable(i)); 747 } 748 749 // delete a slice of "toDelete" (1, 3) 750 toDelete = new long[] { 5, 7, 1, 3, 9 }; 751 procStore.delete(toDelete, 2, 2); 752 loader = restartAndAssert(9, 3, 0, 0); 753 for (int i = 1; i < 10; ++i) { 754 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 755 } 756 757 // delete a single item (5) 758 toDelete = new long[] { 5 }; 759 procStore.delete(toDelete, 0, 1); 760 loader = restartAndAssert(9, 2, 0, 0); 761 for (int i = 1; i < 10; ++i) { 762 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 763 } 764 765 // delete remaining using a slice of "toDelete" (7, 9) 766 toDelete = new long[] { 0, 7, 9 }; 767 procStore.delete(toDelete, 1, 2); 768 loader = restartAndAssert(0, 0, 0, 0); 769 for (int i = 1; i < 10; ++i) { 770 assertEquals(false, loader.isRunnable(i)); 771 } 772 } 773 774 @Test 775 public void testBatchInsert() throws Exception { 776 final int count = 10; 777 final TestProcedure[] procs = new TestProcedure[count]; 778 for (int i = 0; i < procs.length; ++i) { 779 procs[i] = new TestProcedure(i + 1); 780 } 781 procStore.insert(procs); 782 restartAndAssert(count, count, 0, 0); 783 784 for (int i = 0; i < procs.length; ++i) { 785 final long procId = procs[i].getProcId(); 786 procStore.delete(procId); 787 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 788 } 789 procStore.removeInactiveLogsForTesting(); 790 assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); 791 } 792 793 @Test 794 public void testWALDirAndWALArchiveDir() throws IOException { 795 Configuration conf = htu.getConfiguration(); 796 procStore = createWALProcedureStore(conf); 797 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 798 } 799 800 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 801 return new WALProcedureStore(conf, new LeaseRecovery() { 802 @Override 803 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 804 // no-op 805 } 806 }); 807 } 808 809 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, 810 int completedCount, int corruptedCount) throws Exception { 811 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, 812 runnableCount, completedCount, corruptedCount); 813 } 814 815 private void corruptLog(final FileStatus logFile, final long dropBytes) 816 throws IOException { 817 assertTrue(logFile.getLen() > dropBytes); 818 LOG.debug("corrupt log " + logFile.getPath() + 819 " size=" + logFile.getLen() + " drop=" + dropBytes); 820 Path tmpPath = new Path(testDir, "corrupted.log"); 821 InputStream in = fs.open(logFile.getPath()); 822 OutputStream out = fs.create(tmpPath); 823 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 824 if (!fs.rename(tmpPath, logFile.getPath())) { 825 throw new IOException("Unable to rename"); 826 } 827 } 828 829 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 830 LOG.debug("expected: " + procIds); 831 LoadCounter loader = new LoadCounter(); 832 storeRestart(loader); 833 assertEquals(procIds.size(), loader.getLoadedCount()); 834 assertEquals(0, loader.getCorruptedCount()); 835 } 836 837 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 838 private static long seqid = 0; 839 840 public TestSequentialProcedure() { 841 setProcId(++seqid); 842 } 843 844 @Override 845 protected Procedure<Void>[] execute(Void env) { 846 return null; 847 } 848 849 @Override 850 protected void rollback(Void env) { 851 } 852 853 @Override 854 protected boolean abort(Void env) { 855 return false; 856 } 857 858 @Override 859 protected void serializeStateData(ProcedureStateSerializer serializer) 860 throws IOException { 861 long procId = getProcId(); 862 if (procId % 2 == 0) { 863 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 864 serializer.serialize(builder.build()); 865 } 866 } 867 868 @Override 869 protected void deserializeStateData(ProcedureStateSerializer serializer) 870 throws IOException { 871 long procId = getProcId(); 872 if (procId % 2 == 0) { 873 Int64Value value = serializer.deserialize(Int64Value.class); 874 assertEquals(procId, value.getValue()); 875 } 876 } 877 } 878}