001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
045import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
046import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
048import org.apache.hadoop.hbase.testclassification.MasterTests;
049import org.apache.hadoop.hbase.testclassification.SmallTests;
050import org.apache.hadoop.io.IOUtils;
051import org.junit.After;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.mockito.Mockito;
057import org.mockito.invocation.InvocationOnMock;
058import org.mockito.stubbing.Answer;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
063
064@Category({MasterTests.class, SmallTests.class})
065public class TestWALProcedureStore {
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestWALProcedureStore.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
071
072  private static final int PROCEDURE_STORE_SLOTS = 1;
073
074  private WALProcedureStore procStore;
075
076  private HBaseCommonTestingUtility htu;
077  private FileSystem fs;
078  private Path testDir;
079  private Path logDir;
080
081  private void setupConfig(final Configuration conf) {
082    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
083  }
084
085  @Before
086  public void setUp() throws IOException {
087    htu = new HBaseCommonTestingUtility();
088    testDir = htu.getDataTestDir();
089    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
090    fs = testDir.getFileSystem(htu.getConfiguration());
091    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
092    assertTrue(testDir.depth() > 1);
093
094    setupConfig(htu.getConfiguration());
095    logDir = new Path(testDir, "proc-logs");
096    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
097    procStore.start(PROCEDURE_STORE_SLOTS);
098    procStore.recoverLease();
099    procStore.load(new LoadCounter());
100  }
101
102  @After
103  public void tearDown() throws IOException {
104    procStore.stop(false);
105    fs.delete(logDir, true);
106  }
107
108  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
109    ProcedureTestingUtility.storeRestart(procStore, loader);
110  }
111
112  @Test
113  public void testEmptyRoll() throws Exception {
114    for (int i = 0; i < 10; ++i) {
115      procStore.periodicRollForTesting();
116    }
117    assertEquals(1, procStore.getActiveLogs().size());
118    FileStatus[] status = fs.listStatus(logDir);
119    assertEquals(1, status.length);
120  }
121
122  @Test
123  public void testRestartWithoutData() throws Exception {
124    for (int i = 0; i < 10; ++i) {
125      final LoadCounter loader = new LoadCounter();
126      storeRestart(loader);
127    }
128    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
129    assertEquals(1, procStore.getActiveLogs().size());
130    FileStatus[] status = fs.listStatus(logDir);
131    assertEquals(1, status.length);
132  }
133
134  /**
135   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
136   */
137  @Test
138  public void trackersLoadedForAllOldLogs() throws Exception {
139    for (int i = 0; i <= 20; ++i) {
140      procStore.insert(new TestProcedure(i), null);
141      if (i > 0 && (i % 5) == 0) {
142        LoadCounter loader = new LoadCounter();
143        storeRestart(loader);
144      }
145    }
146    assertEquals(5, procStore.getActiveLogs().size());
147    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
148      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
149      assertTrue(tracker != null && !tracker.isEmpty());
150    }
151  }
152
153  @Test
154  public void testWalCleanerSequentialClean() throws Exception {
155    final Procedure<?>[] procs = new Procedure[5];
156    ArrayList<ProcedureWALFile> logs = null;
157
158    // Insert procedures and roll wal after every insert.
159    for (int i = 0; i < procs.length; i++) {
160      procs[i] = new TestSequentialProcedure();
161      procStore.insert(procs[i], null);
162      procStore.rollWriterForTesting();
163      logs = procStore.getActiveLogs();
164      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
165    }
166
167    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
168    // from logs list.
169    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
170    for (int i = 0; i < deleteOrder.length; i++) {
171      procStore.delete(procs[deleteOrder[i]].getProcId());
172      procStore.removeInactiveLogsForTesting();
173      assertFalse(logs.get(deleteOrder[i]).toString(),
174        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
175      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
176    }
177  }
178
179
180  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
181  // they are in the starting of the list.
182  @Test
183  public void testWalCleanerNoHoles() throws Exception {
184    final Procedure<?>[] procs = new Procedure[5];
185    ArrayList<ProcedureWALFile> logs = null;
186    // Insert procedures and roll wal after every insert.
187    for (int i = 0; i < procs.length; i++) {
188      procs[i] = new TestSequentialProcedure();
189      procStore.insert(procs[i], null);
190      procStore.rollWriterForTesting();
191      logs = procStore.getActiveLogs();
192      assertEquals(i + 2, logs.size());  // Extra 1 for current ongoing wal.
193    }
194
195    for (int i = 1; i < procs.length; i++) {
196      procStore.delete(procs[i].getProcId());
197    }
198    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
199    procStore.delete(procs[0].getProcId());
200    assertEquals(1, procStore.getActiveLogs().size());
201  }
202
203  @Test
204  public void testWalCleanerUpdates() throws Exception {
205    TestSequentialProcedure p1 = new TestSequentialProcedure();
206    TestSequentialProcedure p2 = new TestSequentialProcedure();
207    procStore.insert(p1, null);
208    procStore.insert(p2, null);
209    procStore.rollWriterForTesting();
210    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
211    procStore.update(p1);
212    procStore.rollWriterForTesting();
213    procStore.update(p2);
214    procStore.rollWriterForTesting();
215    procStore.removeInactiveLogsForTesting();
216    assertFalse(procStore.getActiveLogs().contains(firstLog));
217  }
218
219  @Test
220  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
221    TestSequentialProcedure p1 = new TestSequentialProcedure();
222    TestSequentialProcedure p2 = new TestSequentialProcedure();
223    procStore.insert(p1, null);
224    procStore.insert(p2, null);
225    procStore.rollWriterForTesting();  // generates first log with p1 + p2
226    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
227    procStore.update(p2);
228    procStore.rollWriterForTesting();  // generates second log with p2
229    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
230    procStore.update(p2);
231    procStore.rollWriterForTesting();  // generates third log with p2
232    procStore.removeInactiveLogsForTesting();  // Shouldn't remove 2nd log.
233    assertEquals(4, procStore.getActiveLogs().size());
234    procStore.update(p1);
235    procStore.rollWriterForTesting();  // generates fourth log with p1
236    procStore.removeInactiveLogsForTesting();  // Should remove first two logs.
237    assertEquals(3, procStore.getActiveLogs().size());
238    assertFalse(procStore.getActiveLogs().contains(log1));
239    assertFalse(procStore.getActiveLogs().contains(log2));
240  }
241
242  @Test
243  public void testWalCleanerWithEmptyRolls() throws Exception {
244    final Procedure<?>[] procs = new Procedure[3];
245    for (int i = 0; i < procs.length; ++i) {
246      procs[i] = new TestSequentialProcedure();
247      procStore.insert(procs[i], null);
248    }
249    assertEquals(1, procStore.getActiveLogs().size());
250    procStore.rollWriterForTesting();
251    assertEquals(2, procStore.getActiveLogs().size());
252    procStore.rollWriterForTesting();
253    assertEquals(3, procStore.getActiveLogs().size());
254
255    for (int i = 0; i < procs.length; ++i) {
256      procStore.update(procs[i]);
257      procStore.rollWriterForTesting();
258      procStore.rollWriterForTesting();
259      if (i < (procs.length - 1)) {
260        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
261      }
262    }
263    assertEquals(7, procStore.getActiveLogs().size());
264
265    for (int i = 0; i < procs.length; ++i) {
266      procStore.delete(procs[i].getProcId());
267      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
268    }
269    assertEquals(1, procStore.getActiveLogs().size());
270  }
271
272  @Test
273  public void testEmptyLogLoad() throws Exception {
274    LoadCounter loader = new LoadCounter();
275    storeRestart(loader);
276    assertEquals(0, loader.getMaxProcId());
277    assertEquals(0, loader.getLoadedCount());
278    assertEquals(0, loader.getCorruptedCount());
279  }
280
281  @Test
282  public void testLoad() throws Exception {
283    Set<Long> procIds = new HashSet<>();
284
285    // Insert something in the log
286    Procedure<?> proc1 = new TestSequentialProcedure();
287    procIds.add(proc1.getProcId());
288    procStore.insert(proc1, null);
289
290    Procedure<?> proc2 = new TestSequentialProcedure();
291    Procedure<?>[] child2 = new Procedure[2];
292    child2[0] = new TestSequentialProcedure();
293    child2[1] = new TestSequentialProcedure();
294
295    procIds.add(proc2.getProcId());
296    procIds.add(child2[0].getProcId());
297    procIds.add(child2[1].getProcId());
298    procStore.insert(proc2, child2);
299
300    // Verify that everything is there
301    verifyProcIdsOnRestart(procIds);
302
303    // Update and delete something
304    procStore.update(proc1);
305    procStore.update(child2[1]);
306    procStore.delete(child2[1].getProcId());
307    procIds.remove(child2[1].getProcId());
308
309    // Verify that everything is there
310    verifyProcIdsOnRestart(procIds);
311
312    // Remove 4 byte from the trailers
313    procStore.stop(false);
314    FileStatus[] logs = fs.listStatus(logDir);
315    assertEquals(3, logs.length);
316    for (int i = 0; i < logs.length; ++i) {
317      corruptLog(logs[i], 4);
318    }
319    verifyProcIdsOnRestart(procIds);
320  }
321
322  @Test
323  public void testNoTrailerDoubleRestart() throws Exception {
324    // log-0001: proc 0, 1 and 2 are inserted
325    Procedure<?> proc0 = new TestSequentialProcedure();
326    procStore.insert(proc0, null);
327    Procedure<?> proc1 = new TestSequentialProcedure();
328    procStore.insert(proc1, null);
329    Procedure<?> proc2 = new TestSequentialProcedure();
330    procStore.insert(proc2, null);
331    procStore.rollWriterForTesting();
332
333    // log-0002: proc 1 deleted
334    procStore.delete(proc1.getProcId());
335    procStore.rollWriterForTesting();
336
337    // log-0003: proc 2 is update
338    procStore.update(proc2);
339    procStore.rollWriterForTesting();
340
341    // log-0004: proc 2 deleted
342    procStore.delete(proc2.getProcId());
343
344    // stop the store and remove the trailer
345    procStore.stop(false);
346    FileStatus[] logs = fs.listStatus(logDir);
347    assertEquals(4, logs.length);
348    for (int i = 0; i < logs.length; ++i) {
349      corruptLog(logs[i], 4);
350    }
351
352    // Test Load 1
353    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
354    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
355    LoadCounter loader = new LoadCounter();
356    storeRestart(loader);
357    assertEquals(1, loader.getLoadedCount());
358    assertEquals(0, loader.getCorruptedCount());
359
360    // Test Load 2
361    assertEquals(5, fs.listStatus(logDir).length);
362    loader = new LoadCounter();
363    storeRestart(loader);
364    assertEquals(1, loader.getLoadedCount());
365    assertEquals(0, loader.getCorruptedCount());
366
367    // remove proc-0
368    procStore.delete(proc0.getProcId());
369    procStore.periodicRollForTesting();
370    assertEquals(1, fs.listStatus(logDir).length);
371    storeRestart(loader);
372  }
373
374  @Test
375  public void testProcIdHoles() throws Exception {
376    // Insert
377    for (int i = 0; i < 100; i += 2) {
378      procStore.insert(new TestProcedure(i), null);
379      if (i > 0 && (i % 10) == 0) {
380        LoadCounter loader = new LoadCounter();
381        storeRestart(loader);
382        assertEquals(0, loader.getCorruptedCount());
383        assertEquals((i / 2) + 1, loader.getLoadedCount());
384      }
385    }
386    assertEquals(10, procStore.getActiveLogs().size());
387
388    // Delete
389    for (int i = 0; i < 100; i += 2) {
390      procStore.delete(i);
391    }
392    assertEquals(1, procStore.getActiveLogs().size());
393
394    LoadCounter loader = new LoadCounter();
395    storeRestart(loader);
396    assertEquals(0, loader.getLoadedCount());
397    assertEquals(0, loader.getCorruptedCount());
398  }
399
400  @Test
401  public void testCorruptedTrailer() throws Exception {
402    // Insert something
403    for (int i = 0; i < 100; ++i) {
404      procStore.insert(new TestSequentialProcedure(), null);
405    }
406
407    // Stop the store
408    procStore.stop(false);
409
410    // Remove 4 byte from the trailer
411    FileStatus[] logs = fs.listStatus(logDir);
412    assertEquals(1, logs.length);
413    corruptLog(logs[0], 4);
414
415    LoadCounter loader = new LoadCounter();
416    storeRestart(loader);
417    assertEquals(100, loader.getLoadedCount());
418    assertEquals(0, loader.getCorruptedCount());
419  }
420
421  private static void assertUpdated(final ProcedureStoreTracker tracker,
422      final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
423    for (int index : updatedProcs) {
424      long procId = procs[index].getProcId();
425      assertTrue("Procedure id : " + procId, tracker.isModified(procId));
426    }
427    for (int index : nonUpdatedProcs) {
428      long procId = procs[index].getProcId();
429      assertFalse("Procedure id : " + procId, tracker.isModified(procId));
430    }
431  }
432
433  private static void assertDeleted(final ProcedureStoreTracker tracker,
434      final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
435    for (int index : deletedProcs) {
436      long procId = procs[index].getProcId();
437      assertEquals("Procedure id : " + procId,
438          ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
439    }
440    for (int index : nonDeletedProcs) {
441      long procId = procs[index].getProcId();
442      assertEquals("Procedure id : " + procId,
443          ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
444    }
445  }
446
447  @Test
448  public void testCorruptedTrailersRebuild() throws Exception {
449    final Procedure<?>[] procs = new Procedure[6];
450    for (int i = 0; i < procs.length; ++i) {
451      procs[i] = new TestSequentialProcedure();
452    }
453    // Log State (I=insert, U=updated, D=delete)
454    //   | log 1 | log 2 | log 3 |
455    // 0 | I, D  |       |       |
456    // 1 | I     |       |       |
457    // 2 | I     | D     |       |
458    // 3 | I     | U     |       |
459    // 4 |       | I     | D     |
460    // 5 |       |       | I     |
461    procStore.insert(procs[0], null);
462    procStore.insert(procs[1], null);
463    procStore.insert(procs[2], null);
464    procStore.insert(procs[3], null);
465    procStore.delete(procs[0].getProcId());
466    procStore.rollWriterForTesting();
467    procStore.delete(procs[2].getProcId());
468    procStore.update(procs[3]);
469    procStore.insert(procs[4], null);
470    procStore.rollWriterForTesting();
471    procStore.delete(procs[4].getProcId());
472    procStore.insert(procs[5], null);
473
474    // Stop the store
475    procStore.stop(false);
476
477    // Remove 4 byte from the trailers
478    final FileStatus[] logs = fs.listStatus(logDir);
479    assertEquals(3, logs.length);
480    for (int i = 0; i < logs.length; ++i) {
481      corruptLog(logs[i], 4);
482    }
483
484    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
485    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
486    final LoadCounter loader = new LoadCounter();
487    storeRestart(loader);
488    assertEquals(3, loader.getLoadedCount());  // procs 1, 3 and 5
489    assertEquals(0, loader.getCorruptedCount());
490
491    // Check the Trackers
492    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
493    LOG.info("WALs " + walFiles);
494    assertEquals(4, walFiles.size());
495    LOG.info("Checking wal " + walFiles.get(0));
496    assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
497    LOG.info("Checking wal " + walFiles.get(1));
498    assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5});
499    LOG.info("Checking wal " + walFiles.get(2));
500    assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3});
501    LOG.info("Checking global tracker ");
502    assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5});
503  }
504
505  @Test
506  public void testCorruptedEntries() throws Exception {
507    // Insert something
508    for (int i = 0; i < 100; ++i) {
509      procStore.insert(new TestSequentialProcedure(), null);
510    }
511
512    // Stop the store
513    procStore.stop(false);
514
515    // Remove some byte from the log
516    // (enough to cut the trailer and corrupt some entries)
517    FileStatus[] logs = fs.listStatus(logDir);
518    assertEquals(1, logs.length);
519    corruptLog(logs[0], 1823);
520
521    LoadCounter loader = new LoadCounter();
522    storeRestart(loader);
523    assertTrue(procStore.getCorruptedLogs() != null);
524    assertEquals(1, procStore.getCorruptedLogs().size());
525    assertEquals(87, loader.getLoadedCount());
526    assertEquals(0, loader.getCorruptedCount());
527  }
528
529  @Test
530  public void testCorruptedProcedures() throws Exception {
531    // Insert root-procedures
532    TestProcedure[] rootProcs = new TestProcedure[10];
533    for (int i = 1; i <= rootProcs.length; i++) {
534      rootProcs[i-1] = new TestProcedure(i, 0);
535      procStore.insert(rootProcs[i-1], null);
536      rootProcs[i-1].addStackId(0);
537      procStore.update(rootProcs[i-1]);
538    }
539    // insert root-child txn
540    procStore.rollWriterForTesting();
541    for (int i = 1; i <= rootProcs.length; i++) {
542      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
543      rootProcs[i-1].addStackId(1);
544      procStore.insert(rootProcs[i-1], new Procedure[] { b });
545    }
546    // insert child updates
547    procStore.rollWriterForTesting();
548    for (int i = 1; i <= rootProcs.length; i++) {
549      procStore.update(new TestProcedure(rootProcs.length + i, i));
550    }
551
552    // Stop the store
553    procStore.stop(false);
554
555    // the first log was removed,
556    // we have insert-txn and updates in the others so everything is fine
557    FileStatus[] logs = fs.listStatus(logDir);
558    assertEquals(Arrays.toString(logs), 2, logs.length);
559    Arrays.sort(logs, new Comparator<FileStatus>() {
560      @Override
561      public int compare(FileStatus o1, FileStatus o2) {
562        return o1.getPath().getName().compareTo(o2.getPath().getName());
563      }
564    });
565
566    LoadCounter loader = new LoadCounter();
567    storeRestart(loader);
568    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
569    assertEquals(0, loader.getCorruptedCount());
570
571    // Remove the second log, we have lost all the root/parent references
572    fs.delete(logs[0].getPath(), false);
573    loader.reset();
574    storeRestart(loader);
575    assertEquals(0, loader.getLoadedCount());
576    assertEquals(rootProcs.length, loader.getCorruptedCount());
577    for (Procedure<?> proc : loader.getCorrupted()) {
578      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
579      assertTrue(proc.toString(),
580        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
581    }
582  }
583
584  @Test
585  public void testRollAndRemove() throws IOException {
586    // Insert something in the log
587    Procedure<?> proc1 = new TestSequentialProcedure();
588    procStore.insert(proc1, null);
589
590    Procedure<?> proc2 = new TestSequentialProcedure();
591    procStore.insert(proc2, null);
592
593    // roll the log, now we have 2
594    procStore.rollWriterForTesting();
595    assertEquals(2, procStore.getActiveLogs().size());
596
597    // everything will be up to date in the second log
598    // so we can remove the first one
599    procStore.update(proc1);
600    procStore.update(proc2);
601    assertEquals(1, procStore.getActiveLogs().size());
602
603    // roll the log, now we have 2
604    procStore.rollWriterForTesting();
605    assertEquals(2, procStore.getActiveLogs().size());
606
607    // remove everything active
608    // so we can remove all the logs
609    procStore.delete(proc1.getProcId());
610    procStore.delete(proc2.getProcId());
611    assertEquals(1, procStore.getActiveLogs().size());
612  }
613
614  @Test
615  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
616    final TestProcedure[] procs = new TestProcedure[3];
617    for (int i = 0; i < procs.length; ++i) {
618      procs[i] = new TestProcedure(i + 1, 0);
619      procStore.insert(procs[i], null);
620    }
621    procStore.rollWriterForTesting();
622    for (int i = 0; i < procs.length; ++i) {
623      procStore.update(procs[i]);
624      procStore.rollWriterForTesting();
625    }
626    procStore.stop(false);
627
628    FileStatus[] status = fs.listStatus(logDir);
629    assertEquals(procs.length + 1, status.length);
630
631    // simulate another active master removing the wals
632    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
633      new LeaseRecovery() {
634        private int count = 0;
635
636        @Override
637        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
638          if (++count <= 2) {
639            fs.delete(path, false);
640            LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
641            throw new FileNotFoundException("test file not found " + path);
642          }
643          LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
644        }
645      });
646
647    final LoadCounter loader = new LoadCounter();
648    procStore.start(PROCEDURE_STORE_SLOTS);
649    procStore.recoverLease();
650    procStore.load(loader);
651    assertEquals(procs.length, loader.getMaxProcId());
652    assertEquals(1, loader.getRunnableCount());
653    assertEquals(0, loader.getCompletedCount());
654    assertEquals(0, loader.getCorruptedCount());
655  }
656
657  @Test
658  public void testLogFileAlreadyExists() throws IOException {
659    final boolean[] tested = {false};
660    WALProcedureStore mStore = Mockito.spy(procStore);
661
662    Answer<Boolean> ans = new Answer<Boolean>() {
663      @Override
664      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
665        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
666        switch ((int) logId) {
667          case 2:
668            // Create a file so that real rollWriter() runs into file exists condition
669            Path logFilePath = mStore.getLogFilePath(logId);
670            mStore.getFileSystem().create(logFilePath);
671            break;
672          case 3:
673            // Success only when we retry with logId 3
674            tested[0] = true;
675          default:
676            break;
677        }
678        return (Boolean) invocationOnMock.callRealMethod();
679      }
680    };
681
682    // First time Store has one log file, next id will be 2
683    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
684    // next time its 3
685    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
686
687    mStore.recoverLease();
688    assertTrue(tested[0]);
689  }
690
691  @Test
692  public void testLoadChildren() throws Exception {
693    TestProcedure a = new TestProcedure(1, 0);
694    TestProcedure b = new TestProcedure(2, 1);
695    TestProcedure c = new TestProcedure(3, 1);
696
697    // INIT
698    procStore.insert(a, null);
699
700    // Run A first step
701    a.addStackId(0);
702    procStore.update(a);
703
704    // Run A second step
705    a.addStackId(1);
706    procStore.insert(a, new Procedure[] { b, c });
707
708    // Run B first step
709    b.addStackId(2);
710    procStore.update(b);
711
712    // Run C first and last step
713    c.addStackId(3);
714    procStore.update(c);
715
716    // Run B second setp
717    b.addStackId(4);
718    procStore.update(b);
719
720    // back to A
721    a.addStackId(5);
722    a.setSuccessState();
723    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
724    restartAndAssert(3, 0, 1, 0);
725  }
726
727  @Test
728  public void testBatchDelete() throws Exception {
729    for (int i = 1; i < 10; ++i) {
730      procStore.insert(new TestProcedure(i), null);
731    }
732
733    // delete nothing
734    long[] toDelete = new long[] { 1, 2, 3, 4 };
735    procStore.delete(toDelete, 2, 0);
736    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
737    for (int i = 1; i < 10; ++i) {
738      assertEquals(true, loader.isRunnable(i));
739    }
740
741    // delete the full "toDelete" array (2, 4, 6, 8)
742    toDelete = new long[] { 2, 4, 6, 8 };
743    procStore.delete(toDelete, 0, toDelete.length);
744    loader = restartAndAssert(9, 5, 0, 0);
745    for (int i = 1; i < 10; ++i) {
746      assertEquals(i % 2 != 0, loader.isRunnable(i));
747    }
748
749    // delete a slice of "toDelete" (1, 3)
750    toDelete = new long[] { 5, 7, 1, 3, 9 };
751    procStore.delete(toDelete, 2, 2);
752    loader = restartAndAssert(9, 3, 0, 0);
753    for (int i = 1; i < 10; ++i) {
754      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
755    }
756
757    // delete a single item (5)
758    toDelete = new long[] { 5 };
759    procStore.delete(toDelete, 0, 1);
760    loader = restartAndAssert(9, 2, 0, 0);
761    for (int i = 1; i < 10; ++i) {
762      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
763    }
764
765    // delete remaining using a slice of "toDelete" (7, 9)
766    toDelete = new long[] { 0, 7, 9 };
767    procStore.delete(toDelete, 1, 2);
768    loader = restartAndAssert(0, 0, 0, 0);
769    for (int i = 1; i < 10; ++i) {
770      assertEquals(false, loader.isRunnable(i));
771    }
772  }
773
774  @Test
775  public void testBatchInsert() throws Exception {
776    final int count = 10;
777    final TestProcedure[] procs = new TestProcedure[count];
778    for (int i = 0; i < procs.length; ++i) {
779      procs[i] = new TestProcedure(i + 1);
780    }
781    procStore.insert(procs);
782    restartAndAssert(count, count, 0, 0);
783
784    for (int i = 0; i < procs.length; ++i) {
785      final long procId = procs[i].getProcId();
786      procStore.delete(procId);
787      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
788    }
789    procStore.removeInactiveLogsForTesting();
790    assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
791  }
792
793  @Test
794  public void testWALDirAndWALArchiveDir() throws IOException {
795    Configuration conf = htu.getConfiguration();
796    procStore = createWALProcedureStore(conf);
797    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
798  }
799
800  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
801    return new WALProcedureStore(conf, new LeaseRecovery() {
802      @Override
803      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
804        // no-op
805      }
806    });
807  }
808
809  private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
810      int completedCount, int corruptedCount) throws Exception {
811    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
812      runnableCount, completedCount, corruptedCount);
813  }
814
815  private void corruptLog(final FileStatus logFile, final long dropBytes)
816      throws IOException {
817    assertTrue(logFile.getLen() > dropBytes);
818    LOG.debug("corrupt log " + logFile.getPath() +
819              " size=" + logFile.getLen() + " drop=" + dropBytes);
820    Path tmpPath = new Path(testDir, "corrupted.log");
821    InputStream in = fs.open(logFile.getPath());
822    OutputStream out =  fs.create(tmpPath);
823    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
824    if (!fs.rename(tmpPath, logFile.getPath())) {
825      throw new IOException("Unable to rename");
826    }
827  }
828
829  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
830    LOG.debug("expected: " + procIds);
831    LoadCounter loader = new LoadCounter();
832    storeRestart(loader);
833    assertEquals(procIds.size(), loader.getLoadedCount());
834    assertEquals(0, loader.getCorruptedCount());
835  }
836
837  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
838    private static long seqid = 0;
839
840    public TestSequentialProcedure() {
841      setProcId(++seqid);
842    }
843
844    @Override
845    protected Procedure<Void>[] execute(Void env) {
846      return null;
847    }
848
849    @Override
850    protected void rollback(Void env) {
851    }
852
853    @Override
854    protected boolean abort(Void env) {
855      return false;
856    }
857
858    @Override
859    protected void serializeStateData(ProcedureStateSerializer serializer)
860        throws IOException {
861      long procId = getProcId();
862      if (procId % 2 == 0) {
863        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
864        serializer.serialize(builder.build());
865      }
866    }
867
868    @Override
869    protected void deserializeStateData(ProcedureStateSerializer serializer)
870        throws IOException {
871      long procId = getProcId();
872      if (procId % 2 == 0) {
873        Int64Value value = serializer.deserialize(Int64Value.class);
874        assertEquals(procId, value.getValue());
875      }
876    }
877  }
878}