001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
044import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.io.IOUtils;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.mockito.Mockito;
056import org.mockito.invocation.InvocationOnMock;
057import org.mockito.stubbing.Answer;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
062
063@Category({MasterTests.class, SmallTests.class})
064public class TestWALProcedureStore {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestWALProcedureStore.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
071
072  private static final int PROCEDURE_STORE_SLOTS = 1;
073
074  private WALProcedureStore procStore;
075
076  private HBaseCommonTestingUtility htu;
077  private FileSystem fs;
078  private Path testDir;
079  private Path logDir;
080
081  private void setupConfig(final Configuration conf) {
082    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
083  }
084
085  @Before
086  public void setUp() throws IOException {
087    htu = new HBaseCommonTestingUtility();
088    testDir = htu.getDataTestDir();
089    fs = testDir.getFileSystem(htu.getConfiguration());
090    assertTrue(testDir.depth() > 1);
091
092    setupConfig(htu.getConfiguration());
093    logDir = new Path(testDir, "proc-logs");
094    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
095    procStore.start(PROCEDURE_STORE_SLOTS);
096    procStore.recoverLease();
097    procStore.load(new LoadCounter());
098  }
099
100  @After
101  public void tearDown() throws IOException {
102    procStore.stop(false);
103    fs.delete(logDir, true);
104  }
105
106  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
107    ProcedureTestingUtility.storeRestart(procStore, loader);
108  }
109
110  @Test
111  public void testEmptyRoll() throws Exception {
112    for (int i = 0; i < 10; ++i) {
113      procStore.periodicRollForTesting();
114    }
115    assertEquals(1, procStore.getActiveLogs().size());
116    FileStatus[] status = fs.listStatus(logDir);
117    assertEquals(1, status.length);
118  }
119
120  @Test
121  public void testRestartWithoutData() throws Exception {
122    for (int i = 0; i < 10; ++i) {
123      final LoadCounter loader = new LoadCounter();
124      storeRestart(loader);
125    }
126    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
127    assertEquals(1, procStore.getActiveLogs().size());
128    FileStatus[] status = fs.listStatus(logDir);
129    assertEquals(1, status.length);
130  }
131
132  /**
133   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
134   */
135  @Test
136  public void trackersLoadedForAllOldLogs() throws Exception {
137    for (int i = 0; i <= 20; ++i) {
138      procStore.insert(new TestProcedure(i), null);
139      if (i > 0 && (i % 5) == 0) {
140        LoadCounter loader = new LoadCounter();
141        storeRestart(loader);
142      }
143    }
144    assertEquals(5, procStore.getActiveLogs().size());
145    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
146      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
147      assertTrue(tracker != null && !tracker.isEmpty());
148    }
149  }
150
151  @Test
152  public void testWalCleanerSequentialClean() throws Exception {
153    final Procedure<?>[] procs = new Procedure[5];
154    ArrayList<ProcedureWALFile> logs = null;
155
156    // Insert procedures and roll wal after every insert.
157    for (int i = 0; i < procs.length; i++) {
158      procs[i] = new TestSequentialProcedure();
159      procStore.insert(procs[i], null);
160      procStore.rollWriterForTesting();
161      logs = procStore.getActiveLogs();
162      assertEquals(logs.size(), i + 2);  // Extra 1 for current ongoing wal.
163    }
164
165    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
166    // from logs list.
167    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
168    for (int i = 0; i < deleteOrder.length; i++) {
169      procStore.delete(procs[deleteOrder[i]].getProcId());
170      procStore.removeInactiveLogsForTesting();
171      assertFalse(logs.get(deleteOrder[i]).toString(),
172        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
173      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
174    }
175  }
176
177
178  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
179  // they are in the starting of the list.
180  @Test
181  public void testWalCleanerNoHoles() throws Exception {
182    final Procedure<?>[] procs = new Procedure[5];
183    ArrayList<ProcedureWALFile> logs = null;
184    // Insert procedures and roll wal after every insert.
185    for (int i = 0; i < procs.length; i++) {
186      procs[i] = new TestSequentialProcedure();
187      procStore.insert(procs[i], null);
188      procStore.rollWriterForTesting();
189      logs = procStore.getActiveLogs();
190      assertEquals(i + 2, logs.size());  // Extra 1 for current ongoing wal.
191    }
192
193    for (int i = 1; i < procs.length; i++) {
194      procStore.delete(procs[i].getProcId());
195    }
196    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
197    procStore.delete(procs[0].getProcId());
198    assertEquals(1, procStore.getActiveLogs().size());
199  }
200
201  @Test
202  public void testWalCleanerUpdates() throws Exception {
203    TestSequentialProcedure p1 = new TestSequentialProcedure();
204    TestSequentialProcedure p2 = new TestSequentialProcedure();
205    procStore.insert(p1, null);
206    procStore.insert(p2, null);
207    procStore.rollWriterForTesting();
208    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
209    procStore.update(p1);
210    procStore.rollWriterForTesting();
211    procStore.update(p2);
212    procStore.rollWriterForTesting();
213    procStore.removeInactiveLogsForTesting();
214    assertFalse(procStore.getActiveLogs().contains(firstLog));
215  }
216
217  @Test
218  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
219    TestSequentialProcedure p1 = new TestSequentialProcedure();
220    TestSequentialProcedure p2 = new TestSequentialProcedure();
221    procStore.insert(p1, null);
222    procStore.insert(p2, null);
223    procStore.rollWriterForTesting();  // generates first log with p1 + p2
224    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
225    procStore.update(p2);
226    procStore.rollWriterForTesting();  // generates second log with p2
227    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
228    procStore.update(p2);
229    procStore.rollWriterForTesting();  // generates third log with p2
230    procStore.removeInactiveLogsForTesting();  // Shouldn't remove 2nd log.
231    assertEquals(4, procStore.getActiveLogs().size());
232    procStore.update(p1);
233    procStore.rollWriterForTesting();  // generates fourth log with p1
234    procStore.removeInactiveLogsForTesting();  // Should remove first two logs.
235    assertEquals(3, procStore.getActiveLogs().size());
236    assertFalse(procStore.getActiveLogs().contains(log1));
237    assertFalse(procStore.getActiveLogs().contains(log2));
238  }
239
240  @Test
241  public void testWalCleanerWithEmptyRolls() throws Exception {
242    final Procedure<?>[] procs = new Procedure[3];
243    for (int i = 0; i < procs.length; ++i) {
244      procs[i] = new TestSequentialProcedure();
245      procStore.insert(procs[i], null);
246    }
247    assertEquals(1, procStore.getActiveLogs().size());
248    procStore.rollWriterForTesting();
249    assertEquals(2, procStore.getActiveLogs().size());
250    procStore.rollWriterForTesting();
251    assertEquals(3, procStore.getActiveLogs().size());
252
253    for (int i = 0; i < procs.length; ++i) {
254      procStore.update(procs[i]);
255      procStore.rollWriterForTesting();
256      procStore.rollWriterForTesting();
257      if (i < (procs.length - 1)) {
258        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
259      }
260    }
261    assertEquals(7, procStore.getActiveLogs().size());
262
263    for (int i = 0; i < procs.length; ++i) {
264      procStore.delete(procs[i].getProcId());
265      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
266    }
267    assertEquals(1, procStore.getActiveLogs().size());
268  }
269
270  @Test
271  public void testEmptyLogLoad() throws Exception {
272    LoadCounter loader = new LoadCounter();
273    storeRestart(loader);
274    assertEquals(0, loader.getMaxProcId());
275    assertEquals(0, loader.getLoadedCount());
276    assertEquals(0, loader.getCorruptedCount());
277  }
278
279  @Test
280  public void testLoad() throws Exception {
281    Set<Long> procIds = new HashSet<>();
282
283    // Insert something in the log
284    Procedure<?> proc1 = new TestSequentialProcedure();
285    procIds.add(proc1.getProcId());
286    procStore.insert(proc1, null);
287
288    Procedure<?> proc2 = new TestSequentialProcedure();
289    Procedure<?>[] child2 = new Procedure[2];
290    child2[0] = new TestSequentialProcedure();
291    child2[1] = new TestSequentialProcedure();
292
293    procIds.add(proc2.getProcId());
294    procIds.add(child2[0].getProcId());
295    procIds.add(child2[1].getProcId());
296    procStore.insert(proc2, child2);
297
298    // Verify that everything is there
299    verifyProcIdsOnRestart(procIds);
300
301    // Update and delete something
302    procStore.update(proc1);
303    procStore.update(child2[1]);
304    procStore.delete(child2[1].getProcId());
305    procIds.remove(child2[1].getProcId());
306
307    // Verify that everything is there
308    verifyProcIdsOnRestart(procIds);
309
310    // Remove 4 byte from the trailers
311    procStore.stop(false);
312    FileStatus[] logs = fs.listStatus(logDir);
313    assertEquals(3, logs.length);
314    for (int i = 0; i < logs.length; ++i) {
315      corruptLog(logs[i], 4);
316    }
317    verifyProcIdsOnRestart(procIds);
318  }
319
320  @Test
321  public void testNoTrailerDoubleRestart() throws Exception {
322    // log-0001: proc 0, 1 and 2 are inserted
323    Procedure<?> proc0 = new TestSequentialProcedure();
324    procStore.insert(proc0, null);
325    Procedure<?> proc1 = new TestSequentialProcedure();
326    procStore.insert(proc1, null);
327    Procedure<?> proc2 = new TestSequentialProcedure();
328    procStore.insert(proc2, null);
329    procStore.rollWriterForTesting();
330
331    // log-0002: proc 1 deleted
332    procStore.delete(proc1.getProcId());
333    procStore.rollWriterForTesting();
334
335    // log-0003: proc 2 is update
336    procStore.update(proc2);
337    procStore.rollWriterForTesting();
338
339    // log-0004: proc 2 deleted
340    procStore.delete(proc2.getProcId());
341
342    // stop the store and remove the trailer
343    procStore.stop(false);
344    FileStatus[] logs = fs.listStatus(logDir);
345    assertEquals(4, logs.length);
346    for (int i = 0; i < logs.length; ++i) {
347      corruptLog(logs[i], 4);
348    }
349
350    // Test Load 1
351    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
352    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
353    LoadCounter loader = new LoadCounter();
354    storeRestart(loader);
355    assertEquals(1, loader.getLoadedCount());
356    assertEquals(0, loader.getCorruptedCount());
357
358    // Test Load 2
359    assertEquals(5, fs.listStatus(logDir).length);
360    loader = new LoadCounter();
361    storeRestart(loader);
362    assertEquals(1, loader.getLoadedCount());
363    assertEquals(0, loader.getCorruptedCount());
364
365    // remove proc-0
366    procStore.delete(proc0.getProcId());
367    procStore.periodicRollForTesting();
368    assertEquals(1, fs.listStatus(logDir).length);
369    storeRestart(loader);
370  }
371
372  @Test
373  public void testProcIdHoles() throws Exception {
374    // Insert
375    for (int i = 0; i < 100; i += 2) {
376      procStore.insert(new TestProcedure(i), null);
377      if (i > 0 && (i % 10) == 0) {
378        LoadCounter loader = new LoadCounter();
379        storeRestart(loader);
380        assertEquals(0, loader.getCorruptedCount());
381        assertEquals((i / 2) + 1, loader.getLoadedCount());
382      }
383    }
384    assertEquals(10, procStore.getActiveLogs().size());
385
386    // Delete
387    for (int i = 0; i < 100; i += 2) {
388      procStore.delete(i);
389    }
390    assertEquals(1, procStore.getActiveLogs().size());
391
392    LoadCounter loader = new LoadCounter();
393    storeRestart(loader);
394    assertEquals(0, loader.getLoadedCount());
395    assertEquals(0, loader.getCorruptedCount());
396  }
397
398  @Test
399  public void testCorruptedTrailer() throws Exception {
400    // Insert something
401    for (int i = 0; i < 100; ++i) {
402      procStore.insert(new TestSequentialProcedure(), null);
403    }
404
405    // Stop the store
406    procStore.stop(false);
407
408    // Remove 4 byte from the trailer
409    FileStatus[] logs = fs.listStatus(logDir);
410    assertEquals(1, logs.length);
411    corruptLog(logs[0], 4);
412
413    LoadCounter loader = new LoadCounter();
414    storeRestart(loader);
415    assertEquals(100, loader.getLoadedCount());
416    assertEquals(0, loader.getCorruptedCount());
417  }
418
419  private static void assertUpdated(final ProcedureStoreTracker tracker,
420      final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
421    for (int index : updatedProcs) {
422      long procId = procs[index].getProcId();
423      assertTrue("Procedure id : " + procId, tracker.isModified(procId));
424    }
425    for (int index : nonUpdatedProcs) {
426      long procId = procs[index].getProcId();
427      assertFalse("Procedure id : " + procId, tracker.isModified(procId));
428    }
429  }
430
431  private static void assertDeleted(final ProcedureStoreTracker tracker,
432      final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
433    for (int index : deletedProcs) {
434      long procId = procs[index].getProcId();
435      assertEquals("Procedure id : " + procId,
436          ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
437    }
438    for (int index : nonDeletedProcs) {
439      long procId = procs[index].getProcId();
440      assertEquals("Procedure id : " + procId,
441          ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
442    }
443  }
444
445  @Test
446  public void testCorruptedTrailersRebuild() throws Exception {
447    final Procedure<?>[] procs = new Procedure[6];
448    for (int i = 0; i < procs.length; ++i) {
449      procs[i] = new TestSequentialProcedure();
450    }
451    // Log State (I=insert, U=updated, D=delete)
452    //   | log 1 | log 2 | log 3 |
453    // 0 | I, D  |       |       |
454    // 1 | I     |       |       |
455    // 2 | I     | D     |       |
456    // 3 | I     | U     |       |
457    // 4 |       | I     | D     |
458    // 5 |       |       | I     |
459    procStore.insert(procs[0], null);
460    procStore.insert(procs[1], null);
461    procStore.insert(procs[2], null);
462    procStore.insert(procs[3], null);
463    procStore.delete(procs[0].getProcId());
464    procStore.rollWriterForTesting();
465    procStore.delete(procs[2].getProcId());
466    procStore.update(procs[3]);
467    procStore.insert(procs[4], null);
468    procStore.rollWriterForTesting();
469    procStore.delete(procs[4].getProcId());
470    procStore.insert(procs[5], null);
471
472    // Stop the store
473    procStore.stop(false);
474
475    // Remove 4 byte from the trailers
476    final FileStatus[] logs = fs.listStatus(logDir);
477    assertEquals(3, logs.length);
478    for (int i = 0; i < logs.length; ++i) {
479      corruptLog(logs[i], 4);
480    }
481
482    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
483    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
484    final LoadCounter loader = new LoadCounter();
485    storeRestart(loader);
486    assertEquals(3, loader.getLoadedCount());  // procs 1, 3 and 5
487    assertEquals(0, loader.getCorruptedCount());
488
489    // Check the Trackers
490    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
491    LOG.info("WALs " + walFiles);
492    assertEquals(4, walFiles.size());
493    LOG.info("Checking wal " + walFiles.get(0));
494    assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5});
495    LOG.info("Checking wal " + walFiles.get(1));
496    assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5});
497    LOG.info("Checking wal " + walFiles.get(2));
498    assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3});
499    LOG.info("Checking global tracker ");
500    assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5});
501  }
502
503  @Test
504  public void testCorruptedEntries() throws Exception {
505    // Insert something
506    for (int i = 0; i < 100; ++i) {
507      procStore.insert(new TestSequentialProcedure(), null);
508    }
509
510    // Stop the store
511    procStore.stop(false);
512
513    // Remove some byte from the log
514    // (enough to cut the trailer and corrupt some entries)
515    FileStatus[] logs = fs.listStatus(logDir);
516    assertEquals(1, logs.length);
517    corruptLog(logs[0], 1823);
518
519    LoadCounter loader = new LoadCounter();
520    storeRestart(loader);
521    assertTrue(procStore.getCorruptedLogs() != null);
522    assertEquals(1, procStore.getCorruptedLogs().size());
523    assertEquals(87, loader.getLoadedCount());
524    assertEquals(0, loader.getCorruptedCount());
525  }
526
527  @Test
528  public void testCorruptedProcedures() throws Exception {
529    // Insert root-procedures
530    TestProcedure[] rootProcs = new TestProcedure[10];
531    for (int i = 1; i <= rootProcs.length; i++) {
532      rootProcs[i-1] = new TestProcedure(i, 0);
533      procStore.insert(rootProcs[i-1], null);
534      rootProcs[i-1].addStackId(0);
535      procStore.update(rootProcs[i-1]);
536    }
537    // insert root-child txn
538    procStore.rollWriterForTesting();
539    for (int i = 1; i <= rootProcs.length; i++) {
540      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
541      rootProcs[i-1].addStackId(1);
542      procStore.insert(rootProcs[i-1], new Procedure[] { b });
543    }
544    // insert child updates
545    procStore.rollWriterForTesting();
546    for (int i = 1; i <= rootProcs.length; i++) {
547      procStore.update(new TestProcedure(rootProcs.length + i, i));
548    }
549
550    // Stop the store
551    procStore.stop(false);
552
553    // the first log was removed,
554    // we have insert-txn and updates in the others so everything is fine
555    FileStatus[] logs = fs.listStatus(logDir);
556    assertEquals(Arrays.toString(logs), 2, logs.length);
557    Arrays.sort(logs, new Comparator<FileStatus>() {
558      @Override
559      public int compare(FileStatus o1, FileStatus o2) {
560        return o1.getPath().getName().compareTo(o2.getPath().getName());
561      }
562    });
563
564    LoadCounter loader = new LoadCounter();
565    storeRestart(loader);
566    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
567    assertEquals(0, loader.getCorruptedCount());
568
569    // Remove the second log, we have lost all the root/parent references
570    fs.delete(logs[0].getPath(), false);
571    loader.reset();
572    storeRestart(loader);
573    assertEquals(0, loader.getLoadedCount());
574    assertEquals(rootProcs.length, loader.getCorruptedCount());
575    for (Procedure<?> proc : loader.getCorrupted()) {
576      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
577      assertTrue(proc.toString(),
578        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
579    }
580  }
581
582  @Test
583  public void testRollAndRemove() throws IOException {
584    // Insert something in the log
585    Procedure<?> proc1 = new TestSequentialProcedure();
586    procStore.insert(proc1, null);
587
588    Procedure<?> proc2 = new TestSequentialProcedure();
589    procStore.insert(proc2, null);
590
591    // roll the log, now we have 2
592    procStore.rollWriterForTesting();
593    assertEquals(2, procStore.getActiveLogs().size());
594
595    // everything will be up to date in the second log
596    // so we can remove the first one
597    procStore.update(proc1);
598    procStore.update(proc2);
599    assertEquals(1, procStore.getActiveLogs().size());
600
601    // roll the log, now we have 2
602    procStore.rollWriterForTesting();
603    assertEquals(2, procStore.getActiveLogs().size());
604
605    // remove everything active
606    // so we can remove all the logs
607    procStore.delete(proc1.getProcId());
608    procStore.delete(proc2.getProcId());
609    assertEquals(1, procStore.getActiveLogs().size());
610  }
611
612  @Test
613  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
614    final TestProcedure[] procs = new TestProcedure[3];
615    for (int i = 0; i < procs.length; ++i) {
616      procs[i] = new TestProcedure(i + 1, 0);
617      procStore.insert(procs[i], null);
618    }
619    procStore.rollWriterForTesting();
620    for (int i = 0; i < procs.length; ++i) {
621      procStore.update(procs[i]);
622      procStore.rollWriterForTesting();
623    }
624    procStore.stop(false);
625
626    FileStatus[] status = fs.listStatus(logDir);
627    assertEquals(procs.length + 1, status.length);
628
629    // simulate another active master removing the wals
630    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
631        new WALProcedureStore.LeaseRecovery() {
632      private int count = 0;
633
634      @Override
635      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
636        if (++count <= 2) {
637          fs.delete(path, false);
638          LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
639          throw new FileNotFoundException("test file not found " + path);
640        }
641        LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
642      }
643    });
644
645    final LoadCounter loader = new LoadCounter();
646    procStore.start(PROCEDURE_STORE_SLOTS);
647    procStore.recoverLease();
648    procStore.load(loader);
649    assertEquals(procs.length, loader.getMaxProcId());
650    assertEquals(1, loader.getRunnableCount());
651    assertEquals(0, loader.getCompletedCount());
652    assertEquals(0, loader.getCorruptedCount());
653  }
654
655  @Test
656  public void testLogFileAleadExists() throws IOException {
657    final boolean[] tested = {false};
658    WALProcedureStore mStore = Mockito.spy(procStore);
659
660    Answer<Boolean> ans = new Answer<Boolean>() {
661      @Override
662      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
663        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
664        switch ((int) logId) {
665          case 2:
666            // Create a file so that real rollWriter() runs into file exists condition
667            Path logFilePath = mStore.getLogFilePath(logId);
668            mStore.getFileSystem().create(logFilePath);
669            break;
670          case 3:
671            // Success only when we retry with logId 3
672            tested[0] = true;
673          default:
674            break;
675        }
676        return (Boolean) invocationOnMock.callRealMethod();
677      }
678    };
679
680    // First time Store has one log file, next id will be 2
681    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
682    // next time its 3
683    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
684
685    mStore.recoverLease();
686    assertTrue(tested[0]);
687  }
688
689  @Test
690  public void testLoadChildren() throws Exception {
691    TestProcedure a = new TestProcedure(1, 0);
692    TestProcedure b = new TestProcedure(2, 1);
693    TestProcedure c = new TestProcedure(3, 1);
694
695    // INIT
696    procStore.insert(a, null);
697
698    // Run A first step
699    a.addStackId(0);
700    procStore.update(a);
701
702    // Run A second step
703    a.addStackId(1);
704    procStore.insert(a, new Procedure[] { b, c });
705
706    // Run B first step
707    b.addStackId(2);
708    procStore.update(b);
709
710    // Run C first and last step
711    c.addStackId(3);
712    procStore.update(c);
713
714    // Run B second setp
715    b.addStackId(4);
716    procStore.update(b);
717
718    // back to A
719    a.addStackId(5);
720    a.setSuccessState();
721    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
722    restartAndAssert(3, 0, 1, 0);
723  }
724
725  @Test
726  public void testBatchDelete() throws Exception {
727    for (int i = 1; i < 10; ++i) {
728      procStore.insert(new TestProcedure(i), null);
729    }
730
731    // delete nothing
732    long[] toDelete = new long[] { 1, 2, 3, 4 };
733    procStore.delete(toDelete, 2, 0);
734    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
735    for (int i = 1; i < 10; ++i) {
736      assertEquals(true, loader.isRunnable(i));
737    }
738
739    // delete the full "toDelete" array (2, 4, 6, 8)
740    toDelete = new long[] { 2, 4, 6, 8 };
741    procStore.delete(toDelete, 0, toDelete.length);
742    loader = restartAndAssert(9, 5, 0, 0);
743    for (int i = 1; i < 10; ++i) {
744      assertEquals(i % 2 != 0, loader.isRunnable(i));
745    }
746
747    // delete a slice of "toDelete" (1, 3)
748    toDelete = new long[] { 5, 7, 1, 3, 9 };
749    procStore.delete(toDelete, 2, 2);
750    loader = restartAndAssert(9, 3, 0, 0);
751    for (int i = 1; i < 10; ++i) {
752      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
753    }
754
755    // delete a single item (5)
756    toDelete = new long[] { 5 };
757    procStore.delete(toDelete, 0, 1);
758    loader = restartAndAssert(9, 2, 0, 0);
759    for (int i = 1; i < 10; ++i) {
760      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
761    }
762
763    // delete remaining using a slice of "toDelete" (7, 9)
764    toDelete = new long[] { 0, 7, 9 };
765    procStore.delete(toDelete, 1, 2);
766    loader = restartAndAssert(0, 0, 0, 0);
767    for (int i = 1; i < 10; ++i) {
768      assertEquals(false, loader.isRunnable(i));
769    }
770  }
771
772  @Test
773  public void testBatchInsert() throws Exception {
774    final int count = 10;
775    final TestProcedure[] procs = new TestProcedure[count];
776    for (int i = 0; i < procs.length; ++i) {
777      procs[i] = new TestProcedure(i + 1);
778    }
779    procStore.insert(procs);
780    restartAndAssert(count, count, 0, 0);
781
782    for (int i = 0; i < procs.length; ++i) {
783      final long procId = procs[i].getProcId();
784      procStore.delete(procId);
785      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
786    }
787    procStore.removeInactiveLogsForTesting();
788    assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
789  }
790
791  @Test
792  public void testWALDirAndWALArchiveDir() throws IOException {
793    Configuration conf = htu.getConfiguration();
794    procStore = createWALProcedureStore(conf);
795    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
796  }
797
798  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
799    return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() {
800      @Override
801      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
802        // no-op
803      }
804    });
805  }
806
807  private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
808      int completedCount, int corruptedCount) throws Exception {
809    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
810      runnableCount, completedCount, corruptedCount);
811  }
812
813  private void corruptLog(final FileStatus logFile, final long dropBytes)
814      throws IOException {
815    assertTrue(logFile.getLen() > dropBytes);
816    LOG.debug("corrupt log " + logFile.getPath() +
817              " size=" + logFile.getLen() + " drop=" + dropBytes);
818    Path tmpPath = new Path(testDir, "corrupted.log");
819    InputStream in = fs.open(logFile.getPath());
820    OutputStream out =  fs.create(tmpPath);
821    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
822    if (!fs.rename(tmpPath, logFile.getPath())) {
823      throw new IOException("Unable to rename");
824    }
825  }
826
827  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
828    LOG.debug("expected: " + procIds);
829    LoadCounter loader = new LoadCounter();
830    storeRestart(loader);
831    assertEquals(procIds.size(), loader.getLoadedCount());
832    assertEquals(0, loader.getCorruptedCount());
833  }
834
835  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
836    private static long seqid = 0;
837
838    public TestSequentialProcedure() {
839      setProcId(++seqid);
840    }
841
842    @Override
843    protected Procedure<Void>[] execute(Void env) {
844      return null;
845    }
846
847    @Override
848    protected void rollback(Void env) {
849    }
850
851    @Override
852    protected boolean abort(Void env) {
853      return false;
854    }
855
856    @Override
857    protected void serializeStateData(ProcedureStateSerializer serializer)
858        throws IOException {
859      long procId = getProcId();
860      if (procId % 2 == 0) {
861        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
862        serializer.serialize(builder.build());
863      }
864    }
865
866    @Override
867    protected void deserializeStateData(ProcedureStateSerializer serializer)
868        throws IOException {
869      long procId = getProcId();
870      if (procId % 2 == 0) {
871        Int64Value value = serializer.deserialize(Int64Value.class);
872        assertEquals(procId, value.getValue());
873      }
874    }
875  }
876}