001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
044import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.io.IOUtils;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.mockito.Mockito;
056import org.mockito.invocation.InvocationOnMock;
057import org.mockito.stubbing.Answer;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
062
063@Category({MasterTests.class, SmallTests.class})
064public class TestWALProcedureStore {
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestWALProcedureStore.class);
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
070
071  private static final int PROCEDURE_STORE_SLOTS = 1;
072
073  private WALProcedureStore procStore;
074
075  private HBaseCommonTestingUtility htu;
076  private FileSystem fs;
077  private Path testDir;
078  private Path logDir;
079
080  private void setupConfig(final Configuration conf) {
081    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
082  }
083
084  @Before
085  public void setUp() throws IOException {
086    htu = new HBaseCommonTestingUtility();
087    testDir = htu.getDataTestDir();
088    fs = testDir.getFileSystem(htu.getConfiguration());
089    assertTrue(testDir.depth() > 1);
090
091    setupConfig(htu.getConfiguration());
092    logDir = new Path(testDir, "proc-logs");
093    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
094    procStore.start(PROCEDURE_STORE_SLOTS);
095    procStore.recoverLease();
096    procStore.load(new LoadCounter());
097  }
098
099  @After
100  public void tearDown() throws IOException {
101    procStore.stop(false);
102    fs.delete(logDir, true);
103  }
104
105  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
106    ProcedureTestingUtility.storeRestart(procStore, loader);
107  }
108
109  @Test
110  public void testEmptyRoll() throws Exception {
111    for (int i = 0; i < 10; ++i) {
112      procStore.periodicRollForTesting();
113    }
114    assertEquals(1, procStore.getActiveLogs().size());
115    FileStatus[] status = fs.listStatus(logDir);
116    assertEquals(1, status.length);
117  }
118
119  @Test
120  public void testRestartWithoutData() throws Exception {
121    for (int i = 0; i < 10; ++i) {
122      final LoadCounter loader = new LoadCounter();
123      storeRestart(loader);
124    }
125    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
126    assertEquals(1, procStore.getActiveLogs().size());
127    FileStatus[] status = fs.listStatus(logDir);
128    assertEquals(1, status.length);
129  }
130
131  /**
132   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
133   */
134  @Test
135  public void trackersLoadedForAllOldLogs() throws Exception {
136    for (int i = 0; i <= 20; ++i) {
137      procStore.insert(new TestProcedure(i), null);
138      if (i > 0 && (i % 5) == 0) {
139        LoadCounter loader = new LoadCounter();
140        storeRestart(loader);
141      }
142    }
143    assertEquals(5, procStore.getActiveLogs().size());
144    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
145      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
146      assertTrue(tracker != null && !tracker.isEmpty());
147    }
148  }
149
150  @Test
151  public void testWalCleanerSequentialClean() throws Exception {
152    final Procedure<?>[] procs = new Procedure[5];
153    ArrayList<ProcedureWALFile> logs = null;
154
155    // Insert procedures and roll wal after every insert.
156    for (int i = 0; i < procs.length; i++) {
157      procs[i] = new TestSequentialProcedure();
158      procStore.insert(procs[i], null);
159      procStore.rollWriterForTesting();
160      logs = procStore.getActiveLogs();
161      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
162    }
163
164    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
165    // from logs list.
166    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
167    for (int i = 0; i < deleteOrder.length; i++) {
168      procStore.delete(procs[deleteOrder[i]].getProcId());
169      procStore.removeInactiveLogsForTesting();
170      assertFalse(logs.get(deleteOrder[i]).toString(),
171        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
172      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
173    }
174  }
175
176
177  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
178  // they are in the starting of the list.
179  @Test
180  public void testWalCleanerNoHoles() throws Exception {
181    final Procedure<?>[] procs = new Procedure[5];
182    ArrayList<ProcedureWALFile> logs = null;
183    // Insert procedures and roll wal after every insert.
184    for (int i = 0; i < procs.length; i++) {
185      procs[i] = new TestSequentialProcedure();
186      procStore.insert(procs[i], null);
187      procStore.rollWriterForTesting();
188      logs = procStore.getActiveLogs();
189      assertEquals(i + 2, logs.size());  // Extra 1 for current ongoing wal.
190    }
191
192    for (int i = 1; i < procs.length; i++) {
193      procStore.delete(procs[i].getProcId());
194    }
195    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
196    procStore.delete(procs[0].getProcId());
197    assertEquals(1, procStore.getActiveLogs().size());
198  }
199
200  @Test
201  public void testWalCleanerUpdates() throws Exception {
202    TestSequentialProcedure p1 = new TestSequentialProcedure();
203    TestSequentialProcedure p2 = new TestSequentialProcedure();
204    procStore.insert(p1, null);
205    procStore.insert(p2, null);
206    procStore.rollWriterForTesting();
207    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
208    procStore.update(p1);
209    procStore.rollWriterForTesting();
210    procStore.update(p2);
211    procStore.rollWriterForTesting();
212    procStore.removeInactiveLogsForTesting();
213    assertFalse(procStore.getActiveLogs().contains(firstLog));
214  }
215
216  @Test
217  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
218    TestSequentialProcedure p1 = new TestSequentialProcedure();
219    TestSequentialProcedure p2 = new TestSequentialProcedure();
220    procStore.insert(p1, null);
221    procStore.insert(p2, null);
222    procStore.rollWriterForTesting();  // generates first log with p1 + p2
223    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
224    procStore.update(p2);
225    procStore.rollWriterForTesting();  // generates second log with p2
226    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
227    procStore.update(p2);
228    procStore.rollWriterForTesting();  // generates third log with p2
229    procStore.removeInactiveLogsForTesting();  // Shouldn't remove 2nd log.
230    assertEquals(4, procStore.getActiveLogs().size());
231    procStore.update(p1);
232    procStore.rollWriterForTesting();  // generates fourth log with p1
233    procStore.removeInactiveLogsForTesting();  // Should remove first two logs.
234    assertEquals(3, procStore.getActiveLogs().size());
235    assertFalse(procStore.getActiveLogs().contains(log1));
236    assertFalse(procStore.getActiveLogs().contains(log2));
237  }
238
239  @Test
240  public void testWalCleanerWithEmptyRolls() throws Exception {
241    final Procedure<?>[] procs = new Procedure[3];
242    for (int i = 0; i < procs.length; ++i) {
243      procs[i] = new TestSequentialProcedure();
244      procStore.insert(procs[i], null);
245    }
246    assertEquals(1, procStore.getActiveLogs().size());
247    procStore.rollWriterForTesting();
248    assertEquals(2, procStore.getActiveLogs().size());
249    procStore.rollWriterForTesting();
250    assertEquals(3, procStore.getActiveLogs().size());
251
252    for (int i = 0; i < procs.length; ++i) {
253      procStore.update(procs[i]);
254      procStore.rollWriterForTesting();
255      procStore.rollWriterForTesting();
256      if (i < (procs.length - 1)) {
257        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
258      }
259    }
260    assertEquals(7, procStore.getActiveLogs().size());
261
262    for (int i = 0; i < procs.length; ++i) {
263      procStore.delete(procs[i].getProcId());
264      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
265    }
266    assertEquals(1, procStore.getActiveLogs().size());
267  }
268
269  @Test
270  public void testEmptyLogLoad() throws Exception {
271    LoadCounter loader = new LoadCounter();
272    storeRestart(loader);
273    assertEquals(0, loader.getMaxProcId());
274    assertEquals(0, loader.getLoadedCount());
275    assertEquals(0, loader.getCorruptedCount());
276  }
277
278  @Test
279  public void testLoad() throws Exception {
280    Set<Long> procIds = new HashSet<>();
281
282    // Insert something in the log
283    Procedure<?> proc1 = new TestSequentialProcedure();
284    procIds.add(proc1.getProcId());
285    procStore.insert(proc1, null);
286
287    Procedure<?> proc2 = new TestSequentialProcedure();
288    Procedure<?>[] child2 = new Procedure[2];
289    child2[0] = new TestSequentialProcedure();
290    child2[1] = new TestSequentialProcedure();
291
292    procIds.add(proc2.getProcId());
293    procIds.add(child2[0].getProcId());
294    procIds.add(child2[1].getProcId());
295    procStore.insert(proc2, child2);
296
297    // Verify that everything is there
298    verifyProcIdsOnRestart(procIds);
299
300    // Update and delete something
301    procStore.update(proc1);
302    procStore.update(child2[1]);
303    procStore.delete(child2[1].getProcId());
304    procIds.remove(child2[1].getProcId());
305
306    // Verify that everything is there
307    verifyProcIdsOnRestart(procIds);
308
309    // Remove 4 byte from the trailers
310    procStore.stop(false);
311    FileStatus[] logs = fs.listStatus(logDir);
312    assertEquals(3, logs.length);
313    for (int i = 0; i < logs.length; ++i) {
314      corruptLog(logs[i], 4);
315    }
316    verifyProcIdsOnRestart(procIds);
317  }
318
319  @Test
320  public void testNoTrailerDoubleRestart() throws Exception {
321    // log-0001: proc 0, 1 and 2 are inserted
322    Procedure<?> proc0 = new TestSequentialProcedure();
323    procStore.insert(proc0, null);
324    Procedure<?> proc1 = new TestSequentialProcedure();
325    procStore.insert(proc1, null);
326    Procedure<?> proc2 = new TestSequentialProcedure();
327    procStore.insert(proc2, null);
328    procStore.rollWriterForTesting();
329
330    // log-0002: proc 1 deleted
331    procStore.delete(proc1.getProcId());
332    procStore.rollWriterForTesting();
333
334    // log-0003: proc 2 is update
335    procStore.update(proc2);
336    procStore.rollWriterForTesting();
337
338    // log-0004: proc 2 deleted
339    procStore.delete(proc2.getProcId());
340
341    // stop the store and remove the trailer
342    procStore.stop(false);
343    FileStatus[] logs = fs.listStatus(logDir);
344    assertEquals(4, logs.length);
345    for (int i = 0; i < logs.length; ++i) {
346      corruptLog(logs[i], 4);
347    }
348
349    // Test Load 1
350    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
351    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
352    LoadCounter loader = new LoadCounter();
353    storeRestart(loader);
354    assertEquals(1, loader.getLoadedCount());
355    assertEquals(0, loader.getCorruptedCount());
356
357    // Test Load 2
358    assertEquals(5, fs.listStatus(logDir).length);
359    loader = new LoadCounter();
360    storeRestart(loader);
361    assertEquals(1, loader.getLoadedCount());
362    assertEquals(0, loader.getCorruptedCount());
363
364    // remove proc-0
365    procStore.delete(proc0.getProcId());
366    procStore.periodicRollForTesting();
367    assertEquals(1, fs.listStatus(logDir).length);
368    storeRestart(loader);
369  }
370
371  @Test
372  public void testProcIdHoles() throws Exception {
373    // Insert
374    for (int i = 0; i < 100; i += 2) {
375      procStore.insert(new TestProcedure(i), null);
376      if (i > 0 && (i % 10) == 0) {
377        LoadCounter loader = new LoadCounter();
378        storeRestart(loader);
379        assertEquals(0, loader.getCorruptedCount());
380        assertEquals((i / 2) + 1, loader.getLoadedCount());
381      }
382    }
383    assertEquals(10, procStore.getActiveLogs().size());
384
385    // Delete
386    for (int i = 0; i < 100; i += 2) {
387      procStore.delete(i);
388    }
389    assertEquals(1, procStore.getActiveLogs().size());
390
391    LoadCounter loader = new LoadCounter();
392    storeRestart(loader);
393    assertEquals(0, loader.getLoadedCount());
394    assertEquals(0, loader.getCorruptedCount());
395  }
396
397  @Test
398  public void testCorruptedTrailer() throws Exception {
399    // Insert something
400    for (int i = 0; i < 100; ++i) {
401      procStore.insert(new TestSequentialProcedure(), null);
402    }
403
404    // Stop the store
405    procStore.stop(false);
406
407    // Remove 4 byte from the trailer
408    FileStatus[] logs = fs.listStatus(logDir);
409    assertEquals(1, logs.length);
410    corruptLog(logs[0], 4);
411
412    LoadCounter loader = new LoadCounter();
413    storeRestart(loader);
414    assertEquals(100, loader.getLoadedCount());
415    assertEquals(0, loader.getCorruptedCount());
416  }
417
418  private static void assertUpdated(final ProcedureStoreTracker tracker,
419      final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
420    for (int index : updatedProcs) {
421      long procId = procs[index].getProcId();
422      assertTrue("Procedure id : " + procId, tracker.isModified(procId));
423    }
424    for (int index : nonUpdatedProcs) {
425      long procId = procs[index].getProcId();
426      assertFalse("Procedure id : " + procId, tracker.isModified(procId));
427    }
428  }
429
430  private static void assertDeleted(final ProcedureStoreTracker tracker,
431      final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
432    for (int index : deletedProcs) {
433      long procId = procs[index].getProcId();
434      assertEquals("Procedure id : " + procId,
435          ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
436    }
437    for (int index : nonDeletedProcs) {
438      long procId = procs[index].getProcId();
439      assertEquals("Procedure id : " + procId,
440          ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
441    }
442  }
443
444  @Test
445  public void testCorruptedTrailersRebuild() throws Exception {
446    final Procedure<?>[] procs = new Procedure[6];
447    for (int i = 0; i < procs.length; ++i) {
448      procs[i] = new TestSequentialProcedure();
449    }
450    // Log State (I=insert, U=updated, D=delete)
451    //   | log 1 | log 2 | log 3 |
452    // 0 | I, D  |       |       |
453    // 1 | I     |       |       |
454    // 2 | I     | D     |       |
455    // 3 | I     | U     |       |
456    // 4 |       | I     | D     |
457    // 5 |       |       | I     |
458    procStore.insert(procs[0], null);
459    procStore.insert(procs[1], null);
460    procStore.insert(procs[2], null);
461    procStore.insert(procs[3], null);
462    procStore.delete(procs[0].getProcId());
463    procStore.rollWriterForTesting();
464    procStore.delete(procs[2].getProcId());
465    procStore.update(procs[3]);
466    procStore.insert(procs[4], null);
467    procStore.rollWriterForTesting();
468    procStore.delete(procs[4].getProcId());
469    procStore.insert(procs[5], null);
470
471    // Stop the store
472    procStore.stop(false);
473
474    // Remove 4 byte from the trailers
475    final FileStatus[] logs = fs.listStatus(logDir);
476    assertEquals(3, logs.length);
477    for (int i = 0; i < logs.length; ++i) {
478      corruptLog(logs[i], 4);
479    }
480
481    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
482    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
483    final LoadCounter loader = new LoadCounter();
484    storeRestart(loader);
485    assertEquals(3, loader.getLoadedCount());  // procs 1, 3 and 5
486    assertEquals(0, loader.getCorruptedCount());
487
488    // Check the Trackers
489    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
490    LOG.info("WALs " + walFiles);
491    assertEquals(4, walFiles.size());
492    LOG.info("Checking wal " + walFiles.get(0));
493    assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
494    LOG.info("Checking wal " + walFiles.get(1));
495    assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5});
496    LOG.info("Checking wal " + walFiles.get(2));
497    assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3});
498    LOG.info("Checking global tracker ");
499    assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5});
500  }
501
502  @Test
503  public void testCorruptedEntries() throws Exception {
504    // Insert something
505    for (int i = 0; i < 100; ++i) {
506      procStore.insert(new TestSequentialProcedure(), null);
507    }
508
509    // Stop the store
510    procStore.stop(false);
511
512    // Remove some byte from the log
513    // (enough to cut the trailer and corrupt some entries)
514    FileStatus[] logs = fs.listStatus(logDir);
515    assertEquals(1, logs.length);
516    corruptLog(logs[0], 1823);
517
518    LoadCounter loader = new LoadCounter();
519    storeRestart(loader);
520    assertTrue(procStore.getCorruptedLogs() != null);
521    assertEquals(1, procStore.getCorruptedLogs().size());
522    assertEquals(87, loader.getLoadedCount());
523    assertEquals(0, loader.getCorruptedCount());
524  }
525
526  @Test
527  public void testCorruptedProcedures() throws Exception {
528    // Insert root-procedures
529    TestProcedure[] rootProcs = new TestProcedure[10];
530    for (int i = 1; i <= rootProcs.length; i++) {
531      rootProcs[i-1] = new TestProcedure(i, 0);
532      procStore.insert(rootProcs[i-1], null);
533      rootProcs[i-1].addStackId(0);
534      procStore.update(rootProcs[i-1]);
535    }
536    // insert root-child txn
537    procStore.rollWriterForTesting();
538    for (int i = 1; i <= rootProcs.length; i++) {
539      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
540      rootProcs[i-1].addStackId(1);
541      procStore.insert(rootProcs[i-1], new Procedure[] { b });
542    }
543    // insert child updates
544    procStore.rollWriterForTesting();
545    for (int i = 1; i <= rootProcs.length; i++) {
546      procStore.update(new TestProcedure(rootProcs.length + i, i));
547    }
548
549    // Stop the store
550    procStore.stop(false);
551
552    // the first log was removed,
553    // we have insert-txn and updates in the others so everything is fine
554    FileStatus[] logs = fs.listStatus(logDir);
555    assertEquals(Arrays.toString(logs), 2, logs.length);
556    Arrays.sort(logs, new Comparator<FileStatus>() {
557      @Override
558      public int compare(FileStatus o1, FileStatus o2) {
559        return o1.getPath().getName().compareTo(o2.getPath().getName());
560      }
561    });
562
563    LoadCounter loader = new LoadCounter();
564    storeRestart(loader);
565    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
566    assertEquals(0, loader.getCorruptedCount());
567
568    // Remove the second log, we have lost all the root/parent references
569    fs.delete(logs[0].getPath(), false);
570    loader.reset();
571    storeRestart(loader);
572    assertEquals(0, loader.getLoadedCount());
573    assertEquals(rootProcs.length, loader.getCorruptedCount());
574    for (Procedure<?> proc : loader.getCorrupted()) {
575      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
576      assertTrue(proc.toString(),
577        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
578    }
579  }
580
581  @Test
582  public void testRollAndRemove() throws IOException {
583    // Insert something in the log
584    Procedure<?> proc1 = new TestSequentialProcedure();
585    procStore.insert(proc1, null);
586
587    Procedure<?> proc2 = new TestSequentialProcedure();
588    procStore.insert(proc2, null);
589
590    // roll the log, now we have 2
591    procStore.rollWriterForTesting();
592    assertEquals(2, procStore.getActiveLogs().size());
593
594    // everything will be up to date in the second log
595    // so we can remove the first one
596    procStore.update(proc1);
597    procStore.update(proc2);
598    assertEquals(1, procStore.getActiveLogs().size());
599
600    // roll the log, now we have 2
601    procStore.rollWriterForTesting();
602    assertEquals(2, procStore.getActiveLogs().size());
603
604    // remove everything active
605    // so we can remove all the logs
606    procStore.delete(proc1.getProcId());
607    procStore.delete(proc2.getProcId());
608    assertEquals(1, procStore.getActiveLogs().size());
609  }
610
611  @Test
612  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
613    final TestProcedure[] procs = new TestProcedure[3];
614    for (int i = 0; i < procs.length; ++i) {
615      procs[i] = new TestProcedure(i + 1, 0);
616      procStore.insert(procs[i], null);
617    }
618    procStore.rollWriterForTesting();
619    for (int i = 0; i < procs.length; ++i) {
620      procStore.update(procs[i]);
621      procStore.rollWriterForTesting();
622    }
623    procStore.stop(false);
624
625    FileStatus[] status = fs.listStatus(logDir);
626    assertEquals(procs.length + 1, status.length);
627
628    // simulate another active master removing the wals
629    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
630      new WALProcedureStore.LeaseRecovery() {
631        private int count = 0;
632
633        @Override
634        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
635          if (++count <= 2) {
636            fs.delete(path, false);
637            LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
638            throw new FileNotFoundException("test file not found " + path);
639          }
640          LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
641        }
642      });
643
644    final LoadCounter loader = new LoadCounter();
645    procStore.start(PROCEDURE_STORE_SLOTS);
646    procStore.recoverLease();
647    procStore.load(loader);
648    assertEquals(procs.length, loader.getMaxProcId());
649    assertEquals(1, loader.getRunnableCount());
650    assertEquals(0, loader.getCompletedCount());
651    assertEquals(0, loader.getCorruptedCount());
652  }
653
654  @Test
655  public void testLogFileAleadExists() throws IOException {
656    final boolean[] tested = {false};
657    WALProcedureStore mStore = Mockito.spy(procStore);
658
659    Answer<Boolean> ans = new Answer<Boolean>() {
660      @Override
661      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
662        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
663        switch ((int) logId) {
664          case 2:
665            // Create a file so that real rollWriter() runs into file exists condition
666            Path logFilePath = mStore.getLogFilePath(logId);
667            mStore.getFileSystem().create(logFilePath);
668            break;
669          case 3:
670            // Success only when we retry with logId 3
671            tested[0] = true;
672          default:
673            break;
674        }
675        return (Boolean) invocationOnMock.callRealMethod();
676      }
677    };
678
679    // First time Store has one log file, next id will be 2
680    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
681    // next time its 3
682    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
683
684    mStore.recoverLease();
685    assertTrue(tested[0]);
686  }
687
688  @Test
689  public void testLoadChildren() throws Exception {
690    TestProcedure a = new TestProcedure(1, 0);
691    TestProcedure b = new TestProcedure(2, 1);
692    TestProcedure c = new TestProcedure(3, 1);
693
694    // INIT
695    procStore.insert(a, null);
696
697    // Run A first step
698    a.addStackId(0);
699    procStore.update(a);
700
701    // Run A second step
702    a.addStackId(1);
703    procStore.insert(a, new Procedure[] { b, c });
704
705    // Run B first step
706    b.addStackId(2);
707    procStore.update(b);
708
709    // Run C first and last step
710    c.addStackId(3);
711    procStore.update(c);
712
713    // Run B second setp
714    b.addStackId(4);
715    procStore.update(b);
716
717    // back to A
718    a.addStackId(5);
719    a.setSuccessState();
720    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
721    restartAndAssert(3, 0, 1, 0);
722  }
723
724  @Test
725  public void testBatchDelete() throws Exception {
726    for (int i = 1; i < 10; ++i) {
727      procStore.insert(new TestProcedure(i), null);
728    }
729
730    // delete nothing
731    long[] toDelete = new long[] { 1, 2, 3, 4 };
732    procStore.delete(toDelete, 2, 0);
733    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
734    for (int i = 1; i < 10; ++i) {
735      assertEquals(true, loader.isRunnable(i));
736    }
737
738    // delete the full "toDelete" array (2, 4, 6, 8)
739    toDelete = new long[] { 2, 4, 6, 8 };
740    procStore.delete(toDelete, 0, toDelete.length);
741    loader = restartAndAssert(9, 5, 0, 0);
742    for (int i = 1; i < 10; ++i) {
743      assertEquals(i % 2 != 0, loader.isRunnable(i));
744    }
745
746    // delete a slice of "toDelete" (1, 3)
747    toDelete = new long[] { 5, 7, 1, 3, 9 };
748    procStore.delete(toDelete, 2, 2);
749    loader = restartAndAssert(9, 3, 0, 0);
750    for (int i = 1; i < 10; ++i) {
751      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
752    }
753
754    // delete a single item (5)
755    toDelete = new long[] { 5 };
756    procStore.delete(toDelete, 0, 1);
757    loader = restartAndAssert(9, 2, 0, 0);
758    for (int i = 1; i < 10; ++i) {
759      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
760    }
761
762    // delete remaining using a slice of "toDelete" (7, 9)
763    toDelete = new long[] { 0, 7, 9 };
764    procStore.delete(toDelete, 1, 2);
765    loader = restartAndAssert(0, 0, 0, 0);
766    for (int i = 1; i < 10; ++i) {
767      assertEquals(false, loader.isRunnable(i));
768    }
769  }
770
771  @Test
772  public void testBatchInsert() throws Exception {
773    final int count = 10;
774    final TestProcedure[] procs = new TestProcedure[count];
775    for (int i = 0; i < procs.length; ++i) {
776      procs[i] = new TestProcedure(i + 1);
777    }
778    procStore.insert(procs);
779    restartAndAssert(count, count, 0, 0);
780
781    for (int i = 0; i < procs.length; ++i) {
782      final long procId = procs[i].getProcId();
783      procStore.delete(procId);
784      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
785    }
786    procStore.removeInactiveLogsForTesting();
787    assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
788  }
789
790  @Test
791  public void testWALDirAndWALArchiveDir() throws IOException {
792    Configuration conf = htu.getConfiguration();
793    procStore = createWALProcedureStore(conf);
794    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
795  }
796
797  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
798    return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() {
799      @Override
800      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
801        // no-op
802      }
803    });
804  }
805
806  private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
807      int completedCount, int corruptedCount) throws Exception {
808    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
809      runnableCount, completedCount, corruptedCount);
810  }
811
812  private void corruptLog(final FileStatus logFile, final long dropBytes)
813      throws IOException {
814    assertTrue(logFile.getLen() > dropBytes);
815    LOG.debug("corrupt log " + logFile.getPath() +
816              " size=" + logFile.getLen() + " drop=" + dropBytes);
817    Path tmpPath = new Path(testDir, "corrupted.log");
818    InputStream in = fs.open(logFile.getPath());
819    OutputStream out =  fs.create(tmpPath);
820    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
821    if (!fs.rename(tmpPath, logFile.getPath())) {
822      throw new IOException("Unable to rename");
823    }
824  }
825
826  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
827    LOG.debug("expected: " + procIds);
828    LoadCounter loader = new LoadCounter();
829    storeRestart(loader);
830    assertEquals(procIds.size(), loader.getLoadedCount());
831    assertEquals(0, loader.getCorruptedCount());
832  }
833
834  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
835    private static long seqid = 0;
836
837    public TestSequentialProcedure() {
838      setProcId(++seqid);
839    }
840
841    @Override
842    protected Procedure<Void>[] execute(Void env) {
843      return null;
844    }
845
846    @Override
847    protected void rollback(Void env) {
848    }
849
850    @Override
851    protected boolean abort(Void env) {
852      return false;
853    }
854
855    @Override
856    protected void serializeStateData(ProcedureStateSerializer serializer)
857        throws IOException {
858      long procId = getProcId();
859      if (procId % 2 == 0) {
860        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
861        serializer.serialize(builder.build());
862      }
863    }
864
865    @Override
866    protected void deserializeStateData(ProcedureStateSerializer serializer)
867        throws IOException {
868      long procId = getProcId();
869      if (procId % 2 == 0) {
870        Int64Value value = serializer.deserialize(Int64Value.class);
871        assertEquals(procId, value.getValue());
872      }
873    }
874  }
875}