001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 039import org.apache.hadoop.hbase.procedure2.Procedure; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 044import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 047import org.apache.hadoop.hbase.testclassification.MasterTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.io.IOUtils; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.mockito.Mockito; 056import org.mockito.invocation.InvocationOnMock; 057import org.mockito.stubbing.Answer; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 062 063@Category({MasterTests.class, SmallTests.class}) 064public class TestWALProcedureStore { 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestWALProcedureStore.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 070 071 private static final int PROCEDURE_STORE_SLOTS = 1; 072 073 private WALProcedureStore procStore; 074 075 private HBaseCommonTestingUtility htu; 076 private FileSystem fs; 077 private Path testDir; 078 private Path logDir; 079 080 private void setupConfig(final Configuration conf) { 081 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 082 } 083 084 @Before 085 public void setUp() throws IOException { 086 htu = new HBaseCommonTestingUtility(); 087 testDir = htu.getDataTestDir(); 088 fs = testDir.getFileSystem(htu.getConfiguration()); 089 assertTrue(testDir.depth() > 1); 090 091 setupConfig(htu.getConfiguration()); 092 logDir = new Path(testDir, "proc-logs"); 093 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 094 procStore.start(PROCEDURE_STORE_SLOTS); 095 procStore.recoverLease(); 096 procStore.load(new LoadCounter()); 097 } 098 099 @After 100 public void tearDown() throws IOException { 101 procStore.stop(false); 102 fs.delete(logDir, true); 103 } 104 105 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 106 ProcedureTestingUtility.storeRestart(procStore, loader); 107 } 108 109 @Test 110 public void testEmptyRoll() throws Exception { 111 for (int i = 0; i < 10; ++i) { 112 procStore.periodicRollForTesting(); 113 } 114 assertEquals(1, procStore.getActiveLogs().size()); 115 FileStatus[] status = fs.listStatus(logDir); 116 assertEquals(1, status.length); 117 } 118 119 @Test 120 public void testRestartWithoutData() throws Exception { 121 for (int i = 0; i < 10; ++i) { 122 final LoadCounter loader = new LoadCounter(); 123 storeRestart(loader); 124 } 125 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 126 assertEquals(1, procStore.getActiveLogs().size()); 127 FileStatus[] status = fs.listStatus(logDir); 128 assertEquals(1, status.length); 129 } 130 131 /** 132 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 133 */ 134 @Test 135 public void trackersLoadedForAllOldLogs() throws Exception { 136 for (int i = 0; i <= 20; ++i) { 137 procStore.insert(new TestProcedure(i), null); 138 if (i > 0 && (i % 5) == 0) { 139 LoadCounter loader = new LoadCounter(); 140 storeRestart(loader); 141 } 142 } 143 assertEquals(5, procStore.getActiveLogs().size()); 144 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 145 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 146 assertTrue(tracker != null && !tracker.isEmpty()); 147 } 148 } 149 150 @Test 151 public void testWalCleanerSequentialClean() throws Exception { 152 final Procedure<?>[] procs = new Procedure[5]; 153 ArrayList<ProcedureWALFile> logs = null; 154 155 // Insert procedures and roll wal after every insert. 156 for (int i = 0; i < procs.length; i++) { 157 procs[i] = new TestSequentialProcedure(); 158 procStore.insert(procs[i], null); 159 procStore.rollWriterForTesting(); 160 logs = procStore.getActiveLogs(); 161 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 162 } 163 164 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 165 // from logs list. 166 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 167 for (int i = 0; i < deleteOrder.length; i++) { 168 procStore.delete(procs[deleteOrder[i]].getProcId()); 169 procStore.removeInactiveLogsForTesting(); 170 assertFalse(logs.get(deleteOrder[i]).toString(), 171 procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); 172 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 173 } 174 } 175 176 177 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 178 // they are in the starting of the list. 179 @Test 180 public void testWalCleanerNoHoles() throws Exception { 181 final Procedure<?>[] procs = new Procedure[5]; 182 ArrayList<ProcedureWALFile> logs = null; 183 // Insert procedures and roll wal after every insert. 184 for (int i = 0; i < procs.length; i++) { 185 procs[i] = new TestSequentialProcedure(); 186 procStore.insert(procs[i], null); 187 procStore.rollWriterForTesting(); 188 logs = procStore.getActiveLogs(); 189 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 190 } 191 192 for (int i = 1; i < procs.length; i++) { 193 procStore.delete(procs[i].getProcId()); 194 } 195 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 196 procStore.delete(procs[0].getProcId()); 197 assertEquals(1, procStore.getActiveLogs().size()); 198 } 199 200 @Test 201 public void testWalCleanerUpdates() throws Exception { 202 TestSequentialProcedure p1 = new TestSequentialProcedure(); 203 TestSequentialProcedure p2 = new TestSequentialProcedure(); 204 procStore.insert(p1, null); 205 procStore.insert(p2, null); 206 procStore.rollWriterForTesting(); 207 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 208 procStore.update(p1); 209 procStore.rollWriterForTesting(); 210 procStore.update(p2); 211 procStore.rollWriterForTesting(); 212 procStore.removeInactiveLogsForTesting(); 213 assertFalse(procStore.getActiveLogs().contains(firstLog)); 214 } 215 216 @Test 217 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 218 TestSequentialProcedure p1 = new TestSequentialProcedure(); 219 TestSequentialProcedure p2 = new TestSequentialProcedure(); 220 procStore.insert(p1, null); 221 procStore.insert(p2, null); 222 procStore.rollWriterForTesting(); // generates first log with p1 + p2 223 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 224 procStore.update(p2); 225 procStore.rollWriterForTesting(); // generates second log with p2 226 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 227 procStore.update(p2); 228 procStore.rollWriterForTesting(); // generates third log with p2 229 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 230 assertEquals(4, procStore.getActiveLogs().size()); 231 procStore.update(p1); 232 procStore.rollWriterForTesting(); // generates fourth log with p1 233 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 234 assertEquals(3, procStore.getActiveLogs().size()); 235 assertFalse(procStore.getActiveLogs().contains(log1)); 236 assertFalse(procStore.getActiveLogs().contains(log2)); 237 } 238 239 @Test 240 public void testWalCleanerWithEmptyRolls() throws Exception { 241 final Procedure<?>[] procs = new Procedure[3]; 242 for (int i = 0; i < procs.length; ++i) { 243 procs[i] = new TestSequentialProcedure(); 244 procStore.insert(procs[i], null); 245 } 246 assertEquals(1, procStore.getActiveLogs().size()); 247 procStore.rollWriterForTesting(); 248 assertEquals(2, procStore.getActiveLogs().size()); 249 procStore.rollWriterForTesting(); 250 assertEquals(3, procStore.getActiveLogs().size()); 251 252 for (int i = 0; i < procs.length; ++i) { 253 procStore.update(procs[i]); 254 procStore.rollWriterForTesting(); 255 procStore.rollWriterForTesting(); 256 if (i < (procs.length - 1)) { 257 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 258 } 259 } 260 assertEquals(7, procStore.getActiveLogs().size()); 261 262 for (int i = 0; i < procs.length; ++i) { 263 procStore.delete(procs[i].getProcId()); 264 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 265 } 266 assertEquals(1, procStore.getActiveLogs().size()); 267 } 268 269 @Test 270 public void testEmptyLogLoad() throws Exception { 271 LoadCounter loader = new LoadCounter(); 272 storeRestart(loader); 273 assertEquals(0, loader.getMaxProcId()); 274 assertEquals(0, loader.getLoadedCount()); 275 assertEquals(0, loader.getCorruptedCount()); 276 } 277 278 @Test 279 public void testLoad() throws Exception { 280 Set<Long> procIds = new HashSet<>(); 281 282 // Insert something in the log 283 Procedure<?> proc1 = new TestSequentialProcedure(); 284 procIds.add(proc1.getProcId()); 285 procStore.insert(proc1, null); 286 287 Procedure<?> proc2 = new TestSequentialProcedure(); 288 Procedure<?>[] child2 = new Procedure[2]; 289 child2[0] = new TestSequentialProcedure(); 290 child2[1] = new TestSequentialProcedure(); 291 292 procIds.add(proc2.getProcId()); 293 procIds.add(child2[0].getProcId()); 294 procIds.add(child2[1].getProcId()); 295 procStore.insert(proc2, child2); 296 297 // Verify that everything is there 298 verifyProcIdsOnRestart(procIds); 299 300 // Update and delete something 301 procStore.update(proc1); 302 procStore.update(child2[1]); 303 procStore.delete(child2[1].getProcId()); 304 procIds.remove(child2[1].getProcId()); 305 306 // Verify that everything is there 307 verifyProcIdsOnRestart(procIds); 308 309 // Remove 4 byte from the trailers 310 procStore.stop(false); 311 FileStatus[] logs = fs.listStatus(logDir); 312 assertEquals(3, logs.length); 313 for (int i = 0; i < logs.length; ++i) { 314 corruptLog(logs[i], 4); 315 } 316 verifyProcIdsOnRestart(procIds); 317 } 318 319 @Test 320 public void testNoTrailerDoubleRestart() throws Exception { 321 // log-0001: proc 0, 1 and 2 are inserted 322 Procedure<?> proc0 = new TestSequentialProcedure(); 323 procStore.insert(proc0, null); 324 Procedure<?> proc1 = new TestSequentialProcedure(); 325 procStore.insert(proc1, null); 326 Procedure<?> proc2 = new TestSequentialProcedure(); 327 procStore.insert(proc2, null); 328 procStore.rollWriterForTesting(); 329 330 // log-0002: proc 1 deleted 331 procStore.delete(proc1.getProcId()); 332 procStore.rollWriterForTesting(); 333 334 // log-0003: proc 2 is update 335 procStore.update(proc2); 336 procStore.rollWriterForTesting(); 337 338 // log-0004: proc 2 deleted 339 procStore.delete(proc2.getProcId()); 340 341 // stop the store and remove the trailer 342 procStore.stop(false); 343 FileStatus[] logs = fs.listStatus(logDir); 344 assertEquals(4, logs.length); 345 for (int i = 0; i < logs.length; ++i) { 346 corruptLog(logs[i], 4); 347 } 348 349 // Test Load 1 350 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 351 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 352 LoadCounter loader = new LoadCounter(); 353 storeRestart(loader); 354 assertEquals(1, loader.getLoadedCount()); 355 assertEquals(0, loader.getCorruptedCount()); 356 357 // Test Load 2 358 assertEquals(5, fs.listStatus(logDir).length); 359 loader = new LoadCounter(); 360 storeRestart(loader); 361 assertEquals(1, loader.getLoadedCount()); 362 assertEquals(0, loader.getCorruptedCount()); 363 364 // remove proc-0 365 procStore.delete(proc0.getProcId()); 366 procStore.periodicRollForTesting(); 367 assertEquals(1, fs.listStatus(logDir).length); 368 storeRestart(loader); 369 } 370 371 @Test 372 public void testProcIdHoles() throws Exception { 373 // Insert 374 for (int i = 0; i < 100; i += 2) { 375 procStore.insert(new TestProcedure(i), null); 376 if (i > 0 && (i % 10) == 0) { 377 LoadCounter loader = new LoadCounter(); 378 storeRestart(loader); 379 assertEquals(0, loader.getCorruptedCount()); 380 assertEquals((i / 2) + 1, loader.getLoadedCount()); 381 } 382 } 383 assertEquals(10, procStore.getActiveLogs().size()); 384 385 // Delete 386 for (int i = 0; i < 100; i += 2) { 387 procStore.delete(i); 388 } 389 assertEquals(1, procStore.getActiveLogs().size()); 390 391 LoadCounter loader = new LoadCounter(); 392 storeRestart(loader); 393 assertEquals(0, loader.getLoadedCount()); 394 assertEquals(0, loader.getCorruptedCount()); 395 } 396 397 @Test 398 public void testCorruptedTrailer() throws Exception { 399 // Insert something 400 for (int i = 0; i < 100; ++i) { 401 procStore.insert(new TestSequentialProcedure(), null); 402 } 403 404 // Stop the store 405 procStore.stop(false); 406 407 // Remove 4 byte from the trailer 408 FileStatus[] logs = fs.listStatus(logDir); 409 assertEquals(1, logs.length); 410 corruptLog(logs[0], 4); 411 412 LoadCounter loader = new LoadCounter(); 413 storeRestart(loader); 414 assertEquals(100, loader.getLoadedCount()); 415 assertEquals(0, loader.getCorruptedCount()); 416 } 417 418 private static void assertUpdated(final ProcedureStoreTracker tracker, 419 final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { 420 for (int index : updatedProcs) { 421 long procId = procs[index].getProcId(); 422 assertTrue("Procedure id : " + procId, tracker.isModified(procId)); 423 } 424 for (int index : nonUpdatedProcs) { 425 long procId = procs[index].getProcId(); 426 assertFalse("Procedure id : " + procId, tracker.isModified(procId)); 427 } 428 } 429 430 private static void assertDeleted(final ProcedureStoreTracker tracker, 431 final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { 432 for (int index : deletedProcs) { 433 long procId = procs[index].getProcId(); 434 assertEquals("Procedure id : " + procId, 435 ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId)); 436 } 437 for (int index : nonDeletedProcs) { 438 long procId = procs[index].getProcId(); 439 assertEquals("Procedure id : " + procId, 440 ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId)); 441 } 442 } 443 444 @Test 445 public void testCorruptedTrailersRebuild() throws Exception { 446 final Procedure<?>[] procs = new Procedure[6]; 447 for (int i = 0; i < procs.length; ++i) { 448 procs[i] = new TestSequentialProcedure(); 449 } 450 // Log State (I=insert, U=updated, D=delete) 451 // | log 1 | log 2 | log 3 | 452 // 0 | I, D | | | 453 // 1 | I | | | 454 // 2 | I | D | | 455 // 3 | I | U | | 456 // 4 | | I | D | 457 // 5 | | | I | 458 procStore.insert(procs[0], null); 459 procStore.insert(procs[1], null); 460 procStore.insert(procs[2], null); 461 procStore.insert(procs[3], null); 462 procStore.delete(procs[0].getProcId()); 463 procStore.rollWriterForTesting(); 464 procStore.delete(procs[2].getProcId()); 465 procStore.update(procs[3]); 466 procStore.insert(procs[4], null); 467 procStore.rollWriterForTesting(); 468 procStore.delete(procs[4].getProcId()); 469 procStore.insert(procs[5], null); 470 471 // Stop the store 472 procStore.stop(false); 473 474 // Remove 4 byte from the trailers 475 final FileStatus[] logs = fs.listStatus(logDir); 476 assertEquals(3, logs.length); 477 for (int i = 0; i < logs.length; ++i) { 478 corruptLog(logs[i], 4); 479 } 480 481 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 482 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 483 final LoadCounter loader = new LoadCounter(); 484 storeRestart(loader); 485 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 486 assertEquals(0, loader.getCorruptedCount()); 487 488 // Check the Trackers 489 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 490 LOG.info("WALs " + walFiles); 491 assertEquals(4, walFiles.size()); 492 LOG.info("Checking wal " + walFiles.get(0)); 493 assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5}); 494 LOG.info("Checking wal " + walFiles.get(1)); 495 assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5}); 496 LOG.info("Checking wal " + walFiles.get(2)); 497 assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3}); 498 LOG.info("Checking global tracker "); 499 assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5}); 500 } 501 502 @Test 503 public void testCorruptedEntries() throws Exception { 504 // Insert something 505 for (int i = 0; i < 100; ++i) { 506 procStore.insert(new TestSequentialProcedure(), null); 507 } 508 509 // Stop the store 510 procStore.stop(false); 511 512 // Remove some byte from the log 513 // (enough to cut the trailer and corrupt some entries) 514 FileStatus[] logs = fs.listStatus(logDir); 515 assertEquals(1, logs.length); 516 corruptLog(logs[0], 1823); 517 518 LoadCounter loader = new LoadCounter(); 519 storeRestart(loader); 520 assertTrue(procStore.getCorruptedLogs() != null); 521 assertEquals(1, procStore.getCorruptedLogs().size()); 522 assertEquals(87, loader.getLoadedCount()); 523 assertEquals(0, loader.getCorruptedCount()); 524 } 525 526 @Test 527 public void testCorruptedProcedures() throws Exception { 528 // Insert root-procedures 529 TestProcedure[] rootProcs = new TestProcedure[10]; 530 for (int i = 1; i <= rootProcs.length; i++) { 531 rootProcs[i-1] = new TestProcedure(i, 0); 532 procStore.insert(rootProcs[i-1], null); 533 rootProcs[i-1].addStackId(0); 534 procStore.update(rootProcs[i-1]); 535 } 536 // insert root-child txn 537 procStore.rollWriterForTesting(); 538 for (int i = 1; i <= rootProcs.length; i++) { 539 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 540 rootProcs[i-1].addStackId(1); 541 procStore.insert(rootProcs[i-1], new Procedure[] { b }); 542 } 543 // insert child updates 544 procStore.rollWriterForTesting(); 545 for (int i = 1; i <= rootProcs.length; i++) { 546 procStore.update(new TestProcedure(rootProcs.length + i, i)); 547 } 548 549 // Stop the store 550 procStore.stop(false); 551 552 // the first log was removed, 553 // we have insert-txn and updates in the others so everything is fine 554 FileStatus[] logs = fs.listStatus(logDir); 555 assertEquals(Arrays.toString(logs), 2, logs.length); 556 Arrays.sort(logs, new Comparator<FileStatus>() { 557 @Override 558 public int compare(FileStatus o1, FileStatus o2) { 559 return o1.getPath().getName().compareTo(o2.getPath().getName()); 560 } 561 }); 562 563 LoadCounter loader = new LoadCounter(); 564 storeRestart(loader); 565 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 566 assertEquals(0, loader.getCorruptedCount()); 567 568 // Remove the second log, we have lost all the root/parent references 569 fs.delete(logs[0].getPath(), false); 570 loader.reset(); 571 storeRestart(loader); 572 assertEquals(0, loader.getLoadedCount()); 573 assertEquals(rootProcs.length, loader.getCorruptedCount()); 574 for (Procedure<?> proc : loader.getCorrupted()) { 575 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); 576 assertTrue(proc.toString(), 577 proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); 578 } 579 } 580 581 @Test 582 public void testRollAndRemove() throws IOException { 583 // Insert something in the log 584 Procedure<?> proc1 = new TestSequentialProcedure(); 585 procStore.insert(proc1, null); 586 587 Procedure<?> proc2 = new TestSequentialProcedure(); 588 procStore.insert(proc2, null); 589 590 // roll the log, now we have 2 591 procStore.rollWriterForTesting(); 592 assertEquals(2, procStore.getActiveLogs().size()); 593 594 // everything will be up to date in the second log 595 // so we can remove the first one 596 procStore.update(proc1); 597 procStore.update(proc2); 598 assertEquals(1, procStore.getActiveLogs().size()); 599 600 // roll the log, now we have 2 601 procStore.rollWriterForTesting(); 602 assertEquals(2, procStore.getActiveLogs().size()); 603 604 // remove everything active 605 // so we can remove all the logs 606 procStore.delete(proc1.getProcId()); 607 procStore.delete(proc2.getProcId()); 608 assertEquals(1, procStore.getActiveLogs().size()); 609 } 610 611 @Test 612 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 613 final TestProcedure[] procs = new TestProcedure[3]; 614 for (int i = 0; i < procs.length; ++i) { 615 procs[i] = new TestProcedure(i + 1, 0); 616 procStore.insert(procs[i], null); 617 } 618 procStore.rollWriterForTesting(); 619 for (int i = 0; i < procs.length; ++i) { 620 procStore.update(procs[i]); 621 procStore.rollWriterForTesting(); 622 } 623 procStore.stop(false); 624 625 FileStatus[] status = fs.listStatus(logDir); 626 assertEquals(procs.length + 1, status.length); 627 628 // simulate another active master removing the wals 629 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, 630 new WALProcedureStore.LeaseRecovery() { 631 private int count = 0; 632 633 @Override 634 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 635 if (++count <= 2) { 636 fs.delete(path, false); 637 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 638 throw new FileNotFoundException("test file not found " + path); 639 } 640 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 641 } 642 }); 643 644 final LoadCounter loader = new LoadCounter(); 645 procStore.start(PROCEDURE_STORE_SLOTS); 646 procStore.recoverLease(); 647 procStore.load(loader); 648 assertEquals(procs.length, loader.getMaxProcId()); 649 assertEquals(1, loader.getRunnableCount()); 650 assertEquals(0, loader.getCompletedCount()); 651 assertEquals(0, loader.getCorruptedCount()); 652 } 653 654 @Test 655 public void testLogFileAleadExists() throws IOException { 656 final boolean[] tested = {false}; 657 WALProcedureStore mStore = Mockito.spy(procStore); 658 659 Answer<Boolean> ans = new Answer<Boolean>() { 660 @Override 661 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 662 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 663 switch ((int) logId) { 664 case 2: 665 // Create a file so that real rollWriter() runs into file exists condition 666 Path logFilePath = mStore.getLogFilePath(logId); 667 mStore.getFileSystem().create(logFilePath); 668 break; 669 case 3: 670 // Success only when we retry with logId 3 671 tested[0] = true; 672 default: 673 break; 674 } 675 return (Boolean) invocationOnMock.callRealMethod(); 676 } 677 }; 678 679 // First time Store has one log file, next id will be 2 680 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 681 // next time its 3 682 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 683 684 mStore.recoverLease(); 685 assertTrue(tested[0]); 686 } 687 688 @Test 689 public void testLoadChildren() throws Exception { 690 TestProcedure a = new TestProcedure(1, 0); 691 TestProcedure b = new TestProcedure(2, 1); 692 TestProcedure c = new TestProcedure(3, 1); 693 694 // INIT 695 procStore.insert(a, null); 696 697 // Run A first step 698 a.addStackId(0); 699 procStore.update(a); 700 701 // Run A second step 702 a.addStackId(1); 703 procStore.insert(a, new Procedure[] { b, c }); 704 705 // Run B first step 706 b.addStackId(2); 707 procStore.update(b); 708 709 // Run C first and last step 710 c.addStackId(3); 711 procStore.update(c); 712 713 // Run B second setp 714 b.addStackId(4); 715 procStore.update(b); 716 717 // back to A 718 a.addStackId(5); 719 a.setSuccessState(); 720 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 721 restartAndAssert(3, 0, 1, 0); 722 } 723 724 @Test 725 public void testBatchDelete() throws Exception { 726 for (int i = 1; i < 10; ++i) { 727 procStore.insert(new TestProcedure(i), null); 728 } 729 730 // delete nothing 731 long[] toDelete = new long[] { 1, 2, 3, 4 }; 732 procStore.delete(toDelete, 2, 0); 733 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 734 for (int i = 1; i < 10; ++i) { 735 assertEquals(true, loader.isRunnable(i)); 736 } 737 738 // delete the full "toDelete" array (2, 4, 6, 8) 739 toDelete = new long[] { 2, 4, 6, 8 }; 740 procStore.delete(toDelete, 0, toDelete.length); 741 loader = restartAndAssert(9, 5, 0, 0); 742 for (int i = 1; i < 10; ++i) { 743 assertEquals(i % 2 != 0, loader.isRunnable(i)); 744 } 745 746 // delete a slice of "toDelete" (1, 3) 747 toDelete = new long[] { 5, 7, 1, 3, 9 }; 748 procStore.delete(toDelete, 2, 2); 749 loader = restartAndAssert(9, 3, 0, 0); 750 for (int i = 1; i < 10; ++i) { 751 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 752 } 753 754 // delete a single item (5) 755 toDelete = new long[] { 5 }; 756 procStore.delete(toDelete, 0, 1); 757 loader = restartAndAssert(9, 2, 0, 0); 758 for (int i = 1; i < 10; ++i) { 759 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 760 } 761 762 // delete remaining using a slice of "toDelete" (7, 9) 763 toDelete = new long[] { 0, 7, 9 }; 764 procStore.delete(toDelete, 1, 2); 765 loader = restartAndAssert(0, 0, 0, 0); 766 for (int i = 1; i < 10; ++i) { 767 assertEquals(false, loader.isRunnable(i)); 768 } 769 } 770 771 @Test 772 public void testBatchInsert() throws Exception { 773 final int count = 10; 774 final TestProcedure[] procs = new TestProcedure[count]; 775 for (int i = 0; i < procs.length; ++i) { 776 procs[i] = new TestProcedure(i + 1); 777 } 778 procStore.insert(procs); 779 restartAndAssert(count, count, 0, 0); 780 781 for (int i = 0; i < procs.length; ++i) { 782 final long procId = procs[i].getProcId(); 783 procStore.delete(procId); 784 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 785 } 786 procStore.removeInactiveLogsForTesting(); 787 assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); 788 } 789 790 @Test 791 public void testWALDirAndWALArchiveDir() throws IOException { 792 Configuration conf = htu.getConfiguration(); 793 procStore = createWALProcedureStore(conf); 794 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 795 } 796 797 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 798 return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() { 799 @Override 800 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 801 // no-op 802 } 803 }); 804 } 805 806 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, 807 int completedCount, int corruptedCount) throws Exception { 808 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, 809 runnableCount, completedCount, corruptedCount); 810 } 811 812 private void corruptLog(final FileStatus logFile, final long dropBytes) 813 throws IOException { 814 assertTrue(logFile.getLen() > dropBytes); 815 LOG.debug("corrupt log " + logFile.getPath() + 816 " size=" + logFile.getLen() + " drop=" + dropBytes); 817 Path tmpPath = new Path(testDir, "corrupted.log"); 818 InputStream in = fs.open(logFile.getPath()); 819 OutputStream out = fs.create(tmpPath); 820 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 821 if (!fs.rename(tmpPath, logFile.getPath())) { 822 throw new IOException("Unable to rename"); 823 } 824 } 825 826 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 827 LOG.debug("expected: " + procIds); 828 LoadCounter loader = new LoadCounter(); 829 storeRestart(loader); 830 assertEquals(procIds.size(), loader.getLoadedCount()); 831 assertEquals(0, loader.getCorruptedCount()); 832 } 833 834 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 835 private static long seqid = 0; 836 837 public TestSequentialProcedure() { 838 setProcId(++seqid); 839 } 840 841 @Override 842 protected Procedure<Void>[] execute(Void env) { 843 return null; 844 } 845 846 @Override 847 protected void rollback(Void env) { 848 } 849 850 @Override 851 protected boolean abort(Void env) { 852 return false; 853 } 854 855 @Override 856 protected void serializeStateData(ProcedureStateSerializer serializer) 857 throws IOException { 858 long procId = getProcId(); 859 if (procId % 2 == 0) { 860 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 861 serializer.serialize(builder.build()); 862 } 863 } 864 865 @Override 866 protected void deserializeStateData(ProcedureStateSerializer serializer) 867 throws IOException { 868 long procId = getProcId(); 869 if (procId % 2 == 0) { 870 Int64Value value = serializer.deserialize(Int64Value.class); 871 assertEquals(procId, value.getValue()); 872 } 873 } 874 } 875}