001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Set; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 039import org.apache.hadoop.hbase.procedure2.Procedure; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 044import org.apache.hadoop.hbase.procedure2.SequentialProcedure; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 047import org.apache.hadoop.hbase.testclassification.MasterTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.io.IOUtils; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.mockito.Mockito; 056import org.mockito.invocation.InvocationOnMock; 057import org.mockito.stubbing.Answer; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 062 063@Category({MasterTests.class, SmallTests.class}) 064public class TestWALProcedureStore { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestWALProcedureStore.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); 071 072 private static final int PROCEDURE_STORE_SLOTS = 1; 073 074 private WALProcedureStore procStore; 075 076 private HBaseCommonTestingUtility htu; 077 private FileSystem fs; 078 private Path testDir; 079 private Path logDir; 080 081 private void setupConfig(final Configuration conf) { 082 conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); 083 } 084 085 @Before 086 public void setUp() throws IOException { 087 htu = new HBaseCommonTestingUtility(); 088 testDir = htu.getDataTestDir(); 089 fs = testDir.getFileSystem(htu.getConfiguration()); 090 assertTrue(testDir.depth() > 1); 091 092 setupConfig(htu.getConfiguration()); 093 logDir = new Path(testDir, "proc-logs"); 094 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 095 procStore.start(PROCEDURE_STORE_SLOTS); 096 procStore.recoverLease(); 097 procStore.load(new LoadCounter()); 098 } 099 100 @After 101 public void tearDown() throws IOException { 102 procStore.stop(false); 103 fs.delete(logDir, true); 104 } 105 106 private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { 107 ProcedureTestingUtility.storeRestart(procStore, loader); 108 } 109 110 @Test 111 public void testEmptyRoll() throws Exception { 112 for (int i = 0; i < 10; ++i) { 113 procStore.periodicRollForTesting(); 114 } 115 assertEquals(1, procStore.getActiveLogs().size()); 116 FileStatus[] status = fs.listStatus(logDir); 117 assertEquals(1, status.length); 118 } 119 120 @Test 121 public void testRestartWithoutData() throws Exception { 122 for (int i = 0; i < 10; ++i) { 123 final LoadCounter loader = new LoadCounter(); 124 storeRestart(loader); 125 } 126 LOG.info("ACTIVE WALs " + procStore.getActiveLogs()); 127 assertEquals(1, procStore.getActiveLogs().size()); 128 FileStatus[] status = fs.listStatus(logDir); 129 assertEquals(1, status.length); 130 } 131 132 /** 133 * Tests that tracker for all old logs are loaded back after procedure store is restarted. 134 */ 135 @Test 136 public void trackersLoadedForAllOldLogs() throws Exception { 137 for (int i = 0; i <= 20; ++i) { 138 procStore.insert(new TestProcedure(i), null); 139 if (i > 0 && (i % 5) == 0) { 140 LoadCounter loader = new LoadCounter(); 141 storeRestart(loader); 142 } 143 } 144 assertEquals(5, procStore.getActiveLogs().size()); 145 for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) { 146 ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker(); 147 assertTrue(tracker != null && !tracker.isEmpty()); 148 } 149 } 150 151 @Test 152 public void testWalCleanerSequentialClean() throws Exception { 153 final Procedure<?>[] procs = new Procedure[5]; 154 ArrayList<ProcedureWALFile> logs = null; 155 156 // Insert procedures and roll wal after every insert. 157 for (int i = 0; i < procs.length; i++) { 158 procs[i] = new TestSequentialProcedure(); 159 procStore.insert(procs[i], null); 160 procStore.rollWriterForTesting(); 161 logs = procStore.getActiveLogs(); 162 assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. 163 } 164 165 // Delete procedures in sequential order make sure that only the corresponding wal is deleted 166 // from logs list. 167 final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; 168 for (int i = 0; i < deleteOrder.length; i++) { 169 procStore.delete(procs[deleteOrder[i]].getProcId()); 170 procStore.removeInactiveLogsForTesting(); 171 assertFalse(logs.get(deleteOrder[i]).toString(), 172 procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); 173 assertEquals(procStore.getActiveLogs().size(), procs.length - i); 174 } 175 } 176 177 178 // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if 179 // they are in the starting of the list. 180 @Test 181 public void testWalCleanerNoHoles() throws Exception { 182 final Procedure<?>[] procs = new Procedure[5]; 183 ArrayList<ProcedureWALFile> logs = null; 184 // Insert procedures and roll wal after every insert. 185 for (int i = 0; i < procs.length; i++) { 186 procs[i] = new TestSequentialProcedure(); 187 procStore.insert(procs[i], null); 188 procStore.rollWriterForTesting(); 189 logs = procStore.getActiveLogs(); 190 assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal. 191 } 192 193 for (int i = 1; i < procs.length; i++) { 194 procStore.delete(procs[i].getProcId()); 195 } 196 assertEquals(procs.length + 1, procStore.getActiveLogs().size()); 197 procStore.delete(procs[0].getProcId()); 198 assertEquals(1, procStore.getActiveLogs().size()); 199 } 200 201 @Test 202 public void testWalCleanerUpdates() throws Exception { 203 TestSequentialProcedure p1 = new TestSequentialProcedure(); 204 TestSequentialProcedure p2 = new TestSequentialProcedure(); 205 procStore.insert(p1, null); 206 procStore.insert(p2, null); 207 procStore.rollWriterForTesting(); 208 ProcedureWALFile firstLog = procStore.getActiveLogs().get(0); 209 procStore.update(p1); 210 procStore.rollWriterForTesting(); 211 procStore.update(p2); 212 procStore.rollWriterForTesting(); 213 procStore.removeInactiveLogsForTesting(); 214 assertFalse(procStore.getActiveLogs().contains(firstLog)); 215 } 216 217 @Test 218 public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { 219 TestSequentialProcedure p1 = new TestSequentialProcedure(); 220 TestSequentialProcedure p2 = new TestSequentialProcedure(); 221 procStore.insert(p1, null); 222 procStore.insert(p2, null); 223 procStore.rollWriterForTesting(); // generates first log with p1 + p2 224 ProcedureWALFile log1 = procStore.getActiveLogs().get(0); 225 procStore.update(p2); 226 procStore.rollWriterForTesting(); // generates second log with p2 227 ProcedureWALFile log2 = procStore.getActiveLogs().get(1); 228 procStore.update(p2); 229 procStore.rollWriterForTesting(); // generates third log with p2 230 procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log. 231 assertEquals(4, procStore.getActiveLogs().size()); 232 procStore.update(p1); 233 procStore.rollWriterForTesting(); // generates fourth log with p1 234 procStore.removeInactiveLogsForTesting(); // Should remove first two logs. 235 assertEquals(3, procStore.getActiveLogs().size()); 236 assertFalse(procStore.getActiveLogs().contains(log1)); 237 assertFalse(procStore.getActiveLogs().contains(log2)); 238 } 239 240 @Test 241 public void testWalCleanerWithEmptyRolls() throws Exception { 242 final Procedure<?>[] procs = new Procedure[3]; 243 for (int i = 0; i < procs.length; ++i) { 244 procs[i] = new TestSequentialProcedure(); 245 procStore.insert(procs[i], null); 246 } 247 assertEquals(1, procStore.getActiveLogs().size()); 248 procStore.rollWriterForTesting(); 249 assertEquals(2, procStore.getActiveLogs().size()); 250 procStore.rollWriterForTesting(); 251 assertEquals(3, procStore.getActiveLogs().size()); 252 253 for (int i = 0; i < procs.length; ++i) { 254 procStore.update(procs[i]); 255 procStore.rollWriterForTesting(); 256 procStore.rollWriterForTesting(); 257 if (i < (procs.length - 1)) { 258 assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size()); 259 } 260 } 261 assertEquals(7, procStore.getActiveLogs().size()); 262 263 for (int i = 0; i < procs.length; ++i) { 264 procStore.delete(procs[i].getProcId()); 265 assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size()); 266 } 267 assertEquals(1, procStore.getActiveLogs().size()); 268 } 269 270 @Test 271 public void testEmptyLogLoad() throws Exception { 272 LoadCounter loader = new LoadCounter(); 273 storeRestart(loader); 274 assertEquals(0, loader.getMaxProcId()); 275 assertEquals(0, loader.getLoadedCount()); 276 assertEquals(0, loader.getCorruptedCount()); 277 } 278 279 @Test 280 public void testLoad() throws Exception { 281 Set<Long> procIds = new HashSet<>(); 282 283 // Insert something in the log 284 Procedure<?> proc1 = new TestSequentialProcedure(); 285 procIds.add(proc1.getProcId()); 286 procStore.insert(proc1, null); 287 288 Procedure<?> proc2 = new TestSequentialProcedure(); 289 Procedure<?>[] child2 = new Procedure[2]; 290 child2[0] = new TestSequentialProcedure(); 291 child2[1] = new TestSequentialProcedure(); 292 293 procIds.add(proc2.getProcId()); 294 procIds.add(child2[0].getProcId()); 295 procIds.add(child2[1].getProcId()); 296 procStore.insert(proc2, child2); 297 298 // Verify that everything is there 299 verifyProcIdsOnRestart(procIds); 300 301 // Update and delete something 302 procStore.update(proc1); 303 procStore.update(child2[1]); 304 procStore.delete(child2[1].getProcId()); 305 procIds.remove(child2[1].getProcId()); 306 307 // Verify that everything is there 308 verifyProcIdsOnRestart(procIds); 309 310 // Remove 4 byte from the trailers 311 procStore.stop(false); 312 FileStatus[] logs = fs.listStatus(logDir); 313 assertEquals(3, logs.length); 314 for (int i = 0; i < logs.length; ++i) { 315 corruptLog(logs[i], 4); 316 } 317 verifyProcIdsOnRestart(procIds); 318 } 319 320 @Test 321 public void testNoTrailerDoubleRestart() throws Exception { 322 // log-0001: proc 0, 1 and 2 are inserted 323 Procedure<?> proc0 = new TestSequentialProcedure(); 324 procStore.insert(proc0, null); 325 Procedure<?> proc1 = new TestSequentialProcedure(); 326 procStore.insert(proc1, null); 327 Procedure<?> proc2 = new TestSequentialProcedure(); 328 procStore.insert(proc2, null); 329 procStore.rollWriterForTesting(); 330 331 // log-0002: proc 1 deleted 332 procStore.delete(proc1.getProcId()); 333 procStore.rollWriterForTesting(); 334 335 // log-0003: proc 2 is update 336 procStore.update(proc2); 337 procStore.rollWriterForTesting(); 338 339 // log-0004: proc 2 deleted 340 procStore.delete(proc2.getProcId()); 341 342 // stop the store and remove the trailer 343 procStore.stop(false); 344 FileStatus[] logs = fs.listStatus(logDir); 345 assertEquals(4, logs.length); 346 for (int i = 0; i < logs.length; ++i) { 347 corruptLog(logs[i], 4); 348 } 349 350 // Test Load 1 351 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 352 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 353 LoadCounter loader = new LoadCounter(); 354 storeRestart(loader); 355 assertEquals(1, loader.getLoadedCount()); 356 assertEquals(0, loader.getCorruptedCount()); 357 358 // Test Load 2 359 assertEquals(5, fs.listStatus(logDir).length); 360 loader = new LoadCounter(); 361 storeRestart(loader); 362 assertEquals(1, loader.getLoadedCount()); 363 assertEquals(0, loader.getCorruptedCount()); 364 365 // remove proc-0 366 procStore.delete(proc0.getProcId()); 367 procStore.periodicRollForTesting(); 368 assertEquals(1, fs.listStatus(logDir).length); 369 storeRestart(loader); 370 } 371 372 @Test 373 public void testProcIdHoles() throws Exception { 374 // Insert 375 for (int i = 0; i < 100; i += 2) { 376 procStore.insert(new TestProcedure(i), null); 377 if (i > 0 && (i % 10) == 0) { 378 LoadCounter loader = new LoadCounter(); 379 storeRestart(loader); 380 assertEquals(0, loader.getCorruptedCount()); 381 assertEquals((i / 2) + 1, loader.getLoadedCount()); 382 } 383 } 384 assertEquals(10, procStore.getActiveLogs().size()); 385 386 // Delete 387 for (int i = 0; i < 100; i += 2) { 388 procStore.delete(i); 389 } 390 assertEquals(1, procStore.getActiveLogs().size()); 391 392 LoadCounter loader = new LoadCounter(); 393 storeRestart(loader); 394 assertEquals(0, loader.getLoadedCount()); 395 assertEquals(0, loader.getCorruptedCount()); 396 } 397 398 @Test 399 public void testCorruptedTrailer() throws Exception { 400 // Insert something 401 for (int i = 0; i < 100; ++i) { 402 procStore.insert(new TestSequentialProcedure(), null); 403 } 404 405 // Stop the store 406 procStore.stop(false); 407 408 // Remove 4 byte from the trailer 409 FileStatus[] logs = fs.listStatus(logDir); 410 assertEquals(1, logs.length); 411 corruptLog(logs[0], 4); 412 413 LoadCounter loader = new LoadCounter(); 414 storeRestart(loader); 415 assertEquals(100, loader.getLoadedCount()); 416 assertEquals(0, loader.getCorruptedCount()); 417 } 418 419 private static void assertUpdated(final ProcedureStoreTracker tracker, 420 final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { 421 for (int index : updatedProcs) { 422 long procId = procs[index].getProcId(); 423 assertTrue("Procedure id : " + procId, tracker.isModified(procId)); 424 } 425 for (int index : nonUpdatedProcs) { 426 long procId = procs[index].getProcId(); 427 assertFalse("Procedure id : " + procId, tracker.isModified(procId)); 428 } 429 } 430 431 private static void assertDeleted(final ProcedureStoreTracker tracker, 432 final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { 433 for (int index : deletedProcs) { 434 long procId = procs[index].getProcId(); 435 assertEquals("Procedure id : " + procId, 436 ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId)); 437 } 438 for (int index : nonDeletedProcs) { 439 long procId = procs[index].getProcId(); 440 assertEquals("Procedure id : " + procId, 441 ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId)); 442 } 443 } 444 445 @Test 446 public void testCorruptedTrailersRebuild() throws Exception { 447 final Procedure<?>[] procs = new Procedure[6]; 448 for (int i = 0; i < procs.length; ++i) { 449 procs[i] = new TestSequentialProcedure(); 450 } 451 // Log State (I=insert, U=updated, D=delete) 452 // | log 1 | log 2 | log 3 | 453 // 0 | I, D | | | 454 // 1 | I | | | 455 // 2 | I | D | | 456 // 3 | I | U | | 457 // 4 | | I | D | 458 // 5 | | | I | 459 procStore.insert(procs[0], null); 460 procStore.insert(procs[1], null); 461 procStore.insert(procs[2], null); 462 procStore.insert(procs[3], null); 463 procStore.delete(procs[0].getProcId()); 464 procStore.rollWriterForTesting(); 465 procStore.delete(procs[2].getProcId()); 466 procStore.update(procs[3]); 467 procStore.insert(procs[4], null); 468 procStore.rollWriterForTesting(); 469 procStore.delete(procs[4].getProcId()); 470 procStore.insert(procs[5], null); 471 472 // Stop the store 473 procStore.stop(false); 474 475 // Remove 4 byte from the trailers 476 final FileStatus[] logs = fs.listStatus(logDir); 477 assertEquals(3, logs.length); 478 for (int i = 0; i < logs.length; ++i) { 479 corruptLog(logs[i], 4); 480 } 481 482 // Restart the store (avoid cleaning up the files, to check the rebuilded trackers) 483 htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); 484 final LoadCounter loader = new LoadCounter(); 485 storeRestart(loader); 486 assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 487 assertEquals(0, loader.getCorruptedCount()); 488 489 // Check the Trackers 490 final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); 491 LOG.info("WALs " + walFiles); 492 assertEquals(4, walFiles.size()); 493 LOG.info("Checking wal " + walFiles.get(0)); 494 assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5}); 495 LOG.info("Checking wal " + walFiles.get(1)); 496 assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5}); 497 LOG.info("Checking wal " + walFiles.get(2)); 498 assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3}); 499 LOG.info("Checking global tracker "); 500 assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5}); 501 } 502 503 @Test 504 public void testCorruptedEntries() throws Exception { 505 // Insert something 506 for (int i = 0; i < 100; ++i) { 507 procStore.insert(new TestSequentialProcedure(), null); 508 } 509 510 // Stop the store 511 procStore.stop(false); 512 513 // Remove some byte from the log 514 // (enough to cut the trailer and corrupt some entries) 515 FileStatus[] logs = fs.listStatus(logDir); 516 assertEquals(1, logs.length); 517 corruptLog(logs[0], 1823); 518 519 LoadCounter loader = new LoadCounter(); 520 storeRestart(loader); 521 assertTrue(procStore.getCorruptedLogs() != null); 522 assertEquals(1, procStore.getCorruptedLogs().size()); 523 assertEquals(87, loader.getLoadedCount()); 524 assertEquals(0, loader.getCorruptedCount()); 525 } 526 527 @Test 528 public void testCorruptedProcedures() throws Exception { 529 // Insert root-procedures 530 TestProcedure[] rootProcs = new TestProcedure[10]; 531 for (int i = 1; i <= rootProcs.length; i++) { 532 rootProcs[i-1] = new TestProcedure(i, 0); 533 procStore.insert(rootProcs[i-1], null); 534 rootProcs[i-1].addStackId(0); 535 procStore.update(rootProcs[i-1]); 536 } 537 // insert root-child txn 538 procStore.rollWriterForTesting(); 539 for (int i = 1; i <= rootProcs.length; i++) { 540 TestProcedure b = new TestProcedure(rootProcs.length + i, i); 541 rootProcs[i-1].addStackId(1); 542 procStore.insert(rootProcs[i-1], new Procedure[] { b }); 543 } 544 // insert child updates 545 procStore.rollWriterForTesting(); 546 for (int i = 1; i <= rootProcs.length; i++) { 547 procStore.update(new TestProcedure(rootProcs.length + i, i)); 548 } 549 550 // Stop the store 551 procStore.stop(false); 552 553 // the first log was removed, 554 // we have insert-txn and updates in the others so everything is fine 555 FileStatus[] logs = fs.listStatus(logDir); 556 assertEquals(Arrays.toString(logs), 2, logs.length); 557 Arrays.sort(logs, new Comparator<FileStatus>() { 558 @Override 559 public int compare(FileStatus o1, FileStatus o2) { 560 return o1.getPath().getName().compareTo(o2.getPath().getName()); 561 } 562 }); 563 564 LoadCounter loader = new LoadCounter(); 565 storeRestart(loader); 566 assertEquals(rootProcs.length * 2, loader.getLoadedCount()); 567 assertEquals(0, loader.getCorruptedCount()); 568 569 // Remove the second log, we have lost all the root/parent references 570 fs.delete(logs[0].getPath(), false); 571 loader.reset(); 572 storeRestart(loader); 573 assertEquals(0, loader.getLoadedCount()); 574 assertEquals(rootProcs.length, loader.getCorruptedCount()); 575 for (Procedure<?> proc : loader.getCorrupted()) { 576 assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); 577 assertTrue(proc.toString(), 578 proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); 579 } 580 } 581 582 @Test 583 public void testRollAndRemove() throws IOException { 584 // Insert something in the log 585 Procedure<?> proc1 = new TestSequentialProcedure(); 586 procStore.insert(proc1, null); 587 588 Procedure<?> proc2 = new TestSequentialProcedure(); 589 procStore.insert(proc2, null); 590 591 // roll the log, now we have 2 592 procStore.rollWriterForTesting(); 593 assertEquals(2, procStore.getActiveLogs().size()); 594 595 // everything will be up to date in the second log 596 // so we can remove the first one 597 procStore.update(proc1); 598 procStore.update(proc2); 599 assertEquals(1, procStore.getActiveLogs().size()); 600 601 // roll the log, now we have 2 602 procStore.rollWriterForTesting(); 603 assertEquals(2, procStore.getActiveLogs().size()); 604 605 // remove everything active 606 // so we can remove all the logs 607 procStore.delete(proc1.getProcId()); 608 procStore.delete(proc2.getProcId()); 609 assertEquals(1, procStore.getActiveLogs().size()); 610 } 611 612 @Test 613 public void testFileNotFoundDuringLeaseRecovery() throws IOException { 614 final TestProcedure[] procs = new TestProcedure[3]; 615 for (int i = 0; i < procs.length; ++i) { 616 procs[i] = new TestProcedure(i + 1, 0); 617 procStore.insert(procs[i], null); 618 } 619 procStore.rollWriterForTesting(); 620 for (int i = 0; i < procs.length; ++i) { 621 procStore.update(procs[i]); 622 procStore.rollWriterForTesting(); 623 } 624 procStore.stop(false); 625 626 FileStatus[] status = fs.listStatus(logDir); 627 assertEquals(procs.length + 1, status.length); 628 629 // simulate another active master removing the wals 630 procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, 631 new WALProcedureStore.LeaseRecovery() { 632 private int count = 0; 633 634 @Override 635 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 636 if (++count <= 2) { 637 fs.delete(path, false); 638 LOG.debug("Simulate FileNotFound at count=" + count + " for " + path); 639 throw new FileNotFoundException("test file not found " + path); 640 } 641 LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path); 642 } 643 }); 644 645 final LoadCounter loader = new LoadCounter(); 646 procStore.start(PROCEDURE_STORE_SLOTS); 647 procStore.recoverLease(); 648 procStore.load(loader); 649 assertEquals(procs.length, loader.getMaxProcId()); 650 assertEquals(1, loader.getRunnableCount()); 651 assertEquals(0, loader.getCompletedCount()); 652 assertEquals(0, loader.getCorruptedCount()); 653 } 654 655 @Test 656 public void testLogFileAleadExists() throws IOException { 657 final boolean[] tested = {false}; 658 WALProcedureStore mStore = Mockito.spy(procStore); 659 660 Answer<Boolean> ans = new Answer<Boolean>() { 661 @Override 662 public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { 663 long logId = ((Long) invocationOnMock.getArgument(0)).longValue(); 664 switch ((int) logId) { 665 case 2: 666 // Create a file so that real rollWriter() runs into file exists condition 667 Path logFilePath = mStore.getLogFilePath(logId); 668 mStore.getFileSystem().create(logFilePath); 669 break; 670 case 3: 671 // Success only when we retry with logId 3 672 tested[0] = true; 673 default: 674 break; 675 } 676 return (Boolean) invocationOnMock.callRealMethod(); 677 } 678 }; 679 680 // First time Store has one log file, next id will be 2 681 Mockito.doAnswer(ans).when(mStore).rollWriter(2); 682 // next time its 3 683 Mockito.doAnswer(ans).when(mStore).rollWriter(3); 684 685 mStore.recoverLease(); 686 assertTrue(tested[0]); 687 } 688 689 @Test 690 public void testLoadChildren() throws Exception { 691 TestProcedure a = new TestProcedure(1, 0); 692 TestProcedure b = new TestProcedure(2, 1); 693 TestProcedure c = new TestProcedure(3, 1); 694 695 // INIT 696 procStore.insert(a, null); 697 698 // Run A first step 699 a.addStackId(0); 700 procStore.update(a); 701 702 // Run A second step 703 a.addStackId(1); 704 procStore.insert(a, new Procedure[] { b, c }); 705 706 // Run B first step 707 b.addStackId(2); 708 procStore.update(b); 709 710 // Run C first and last step 711 c.addStackId(3); 712 procStore.update(c); 713 714 // Run B second setp 715 b.addStackId(4); 716 procStore.update(b); 717 718 // back to A 719 a.addStackId(5); 720 a.setSuccessState(); 721 procStore.delete(a, new long[] { b.getProcId(), c.getProcId() }); 722 restartAndAssert(3, 0, 1, 0); 723 } 724 725 @Test 726 public void testBatchDelete() throws Exception { 727 for (int i = 1; i < 10; ++i) { 728 procStore.insert(new TestProcedure(i), null); 729 } 730 731 // delete nothing 732 long[] toDelete = new long[] { 1, 2, 3, 4 }; 733 procStore.delete(toDelete, 2, 0); 734 LoadCounter loader = restartAndAssert(9, 9, 0, 0); 735 for (int i = 1; i < 10; ++i) { 736 assertEquals(true, loader.isRunnable(i)); 737 } 738 739 // delete the full "toDelete" array (2, 4, 6, 8) 740 toDelete = new long[] { 2, 4, 6, 8 }; 741 procStore.delete(toDelete, 0, toDelete.length); 742 loader = restartAndAssert(9, 5, 0, 0); 743 for (int i = 1; i < 10; ++i) { 744 assertEquals(i % 2 != 0, loader.isRunnable(i)); 745 } 746 747 // delete a slice of "toDelete" (1, 3) 748 toDelete = new long[] { 5, 7, 1, 3, 9 }; 749 procStore.delete(toDelete, 2, 2); 750 loader = restartAndAssert(9, 3, 0, 0); 751 for (int i = 1; i < 10; ++i) { 752 assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); 753 } 754 755 // delete a single item (5) 756 toDelete = new long[] { 5 }; 757 procStore.delete(toDelete, 0, 1); 758 loader = restartAndAssert(9, 2, 0, 0); 759 for (int i = 1; i < 10; ++i) { 760 assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); 761 } 762 763 // delete remaining using a slice of "toDelete" (7, 9) 764 toDelete = new long[] { 0, 7, 9 }; 765 procStore.delete(toDelete, 1, 2); 766 loader = restartAndAssert(0, 0, 0, 0); 767 for (int i = 1; i < 10; ++i) { 768 assertEquals(false, loader.isRunnable(i)); 769 } 770 } 771 772 @Test 773 public void testBatchInsert() throws Exception { 774 final int count = 10; 775 final TestProcedure[] procs = new TestProcedure[count]; 776 for (int i = 0; i < procs.length; ++i) { 777 procs[i] = new TestProcedure(i + 1); 778 } 779 procStore.insert(procs); 780 restartAndAssert(count, count, 0, 0); 781 782 for (int i = 0; i < procs.length; ++i) { 783 final long procId = procs[i].getProcId(); 784 procStore.delete(procId); 785 restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0); 786 } 787 procStore.removeInactiveLogsForTesting(); 788 assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size()); 789 } 790 791 @Test 792 public void testWALDirAndWALArchiveDir() throws IOException { 793 Configuration conf = htu.getConfiguration(); 794 procStore = createWALProcedureStore(conf); 795 assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf)); 796 } 797 798 private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException { 799 return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() { 800 @Override 801 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 802 // no-op 803 } 804 }); 805 } 806 807 private LoadCounter restartAndAssert(long maxProcId, long runnableCount, 808 int completedCount, int corruptedCount) throws Exception { 809 return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, 810 runnableCount, completedCount, corruptedCount); 811 } 812 813 private void corruptLog(final FileStatus logFile, final long dropBytes) 814 throws IOException { 815 assertTrue(logFile.getLen() > dropBytes); 816 LOG.debug("corrupt log " + logFile.getPath() + 817 " size=" + logFile.getLen() + " drop=" + dropBytes); 818 Path tmpPath = new Path(testDir, "corrupted.log"); 819 InputStream in = fs.open(logFile.getPath()); 820 OutputStream out = fs.create(tmpPath); 821 IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true); 822 if (!fs.rename(tmpPath, logFile.getPath())) { 823 throw new IOException("Unable to rename"); 824 } 825 } 826 827 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 828 LOG.debug("expected: " + procIds); 829 LoadCounter loader = new LoadCounter(); 830 storeRestart(loader); 831 assertEquals(procIds.size(), loader.getLoadedCount()); 832 assertEquals(0, loader.getCorruptedCount()); 833 } 834 835 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 836 private static long seqid = 0; 837 838 public TestSequentialProcedure() { 839 setProcId(++seqid); 840 } 841 842 @Override 843 protected Procedure<Void>[] execute(Void env) { 844 return null; 845 } 846 847 @Override 848 protected void rollback(Void env) { 849 } 850 851 @Override 852 protected boolean abort(Void env) { 853 return false; 854 } 855 856 @Override 857 protected void serializeStateData(ProcedureStateSerializer serializer) 858 throws IOException { 859 long procId = getProcId(); 860 if (procId % 2 == 0) { 861 Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId); 862 serializer.serialize(builder.build()); 863 } 864 } 865 866 @Override 867 protected void deserializeStateData(ProcedureStateSerializer serializer) 868 throws IOException { 869 long procId = getProcId(); 870 if (procId % 2 == 0) { 871 Int64Value value = serializer.deserialize(Int64Value.class); 872 assertEquals(procId, value.getValue()); 873 } 874 } 875 } 876}