001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Comparator;
031import java.util.HashSet;
032import java.util.Set;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
042import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
045import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
046import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
048import org.apache.hadoop.hbase.testclassification.MasterTests;
049import org.apache.hadoop.hbase.testclassification.SmallTests;
050import org.apache.hadoop.io.IOUtils;
051import org.junit.After;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.mockito.Mockito;
057import org.mockito.invocation.InvocationOnMock;
058import org.mockito.stubbing.Answer;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
063
064@Category({ MasterTests.class, SmallTests.class })
065public class TestWALProcedureStore {
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestWALProcedureStore.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
071
072  private static final int PROCEDURE_STORE_SLOTS = 1;
073
074  private WALProcedureStore procStore;
075
076  private HBaseCommonTestingUtility htu;
077  private FileSystem fs;
078  private Path testDir;
079  private Path logDir;
080
081  private void setupConfig(final Configuration conf) {
082    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
083  }
084
085  @Before
086  public void setUp() throws IOException {
087    htu = new HBaseCommonTestingUtility();
088    testDir = htu.getDataTestDir();
089    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
090    fs = testDir.getFileSystem(htu.getConfiguration());
091    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
092    assertTrue(testDir.depth() > 1);
093
094    setupConfig(htu.getConfiguration());
095    logDir = new Path(testDir, "proc-logs");
096    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
097    procStore.start(PROCEDURE_STORE_SLOTS);
098    procStore.recoverLease();
099    procStore.load(new LoadCounter());
100  }
101
102  @After
103  public void tearDown() throws IOException {
104    procStore.stop(false);
105    fs.delete(logDir, true);
106  }
107
108  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
109    ProcedureTestingUtility.storeRestart(procStore, loader);
110  }
111
112  @Test
113  public void testEmptyRoll() throws Exception {
114    for (int i = 0; i < 10; ++i) {
115      procStore.periodicRollForTesting();
116    }
117    assertEquals(1, procStore.getActiveLogs().size());
118    FileStatus[] status = fs.listStatus(logDir);
119    assertEquals(1, status.length);
120  }
121
122  @Test
123  public void testRestartWithoutData() throws Exception {
124    for (int i = 0; i < 10; ++i) {
125      final LoadCounter loader = new LoadCounter();
126      storeRestart(loader);
127    }
128    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
129    assertEquals(1, procStore.getActiveLogs().size());
130    FileStatus[] status = fs.listStatus(logDir);
131    assertEquals(1, status.length);
132  }
133
134  /**
135   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
136   */
137  @Test
138  public void trackersLoadedForAllOldLogs() throws Exception {
139    for (int i = 0; i <= 20; ++i) {
140      procStore.insert(new TestProcedure(i), null);
141      if (i > 0 && (i % 5) == 0) {
142        LoadCounter loader = new LoadCounter();
143        storeRestart(loader);
144      }
145    }
146    assertEquals(5, procStore.getActiveLogs().size());
147    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
148      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
149      assertTrue(tracker != null && !tracker.isEmpty());
150    }
151  }
152
153  @Test
154  public void testWalCleanerSequentialClean() throws Exception {
155    final Procedure<?>[] procs = new Procedure[5];
156    ArrayList<ProcedureWALFile> logs = null;
157
158    // Insert procedures and roll wal after every insert.
159    for (int i = 0; i < procs.length; i++) {
160      procs[i] = new TestSequentialProcedure();
161      procStore.insert(procs[i], null);
162      procStore.rollWriterForTesting();
163      logs = procStore.getActiveLogs();
164      assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal.
165    }
166
167    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
168    // from logs list.
169    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
170    for (int i = 0; i < deleteOrder.length; i++) {
171      procStore.delete(procs[deleteOrder[i]].getProcId());
172      procStore.removeInactiveLogsForTesting();
173      assertFalse(logs.get(deleteOrder[i]).toString(),
174        procStore.getActiveLogs().contains(logs.get(deleteOrder[i])));
175      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
176    }
177  }
178
179  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
180  // they are in the starting of the list.
181  @Test
182  public void testWalCleanerNoHoles() throws Exception {
183    final Procedure<?>[] procs = new Procedure[5];
184    ArrayList<ProcedureWALFile> logs = null;
185    // Insert procedures and roll wal after every insert.
186    for (int i = 0; i < procs.length; i++) {
187      procs[i] = new TestSequentialProcedure();
188      procStore.insert(procs[i], null);
189      procStore.rollWriterForTesting();
190      logs = procStore.getActiveLogs();
191      assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal.
192    }
193
194    for (int i = 1; i < procs.length; i++) {
195      procStore.delete(procs[i].getProcId());
196    }
197    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
198    procStore.delete(procs[0].getProcId());
199    assertEquals(1, procStore.getActiveLogs().size());
200  }
201
202  @Test
203  public void testWalCleanerUpdates() throws Exception {
204    TestSequentialProcedure p1 = new TestSequentialProcedure();
205    TestSequentialProcedure p2 = new TestSequentialProcedure();
206    procStore.insert(p1, null);
207    procStore.insert(p2, null);
208    procStore.rollWriterForTesting();
209    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
210    procStore.update(p1);
211    procStore.rollWriterForTesting();
212    procStore.update(p2);
213    procStore.rollWriterForTesting();
214    procStore.removeInactiveLogsForTesting();
215    assertFalse(procStore.getActiveLogs().contains(firstLog));
216  }
217
218  @Test
219  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
220    TestSequentialProcedure p1 = new TestSequentialProcedure();
221    TestSequentialProcedure p2 = new TestSequentialProcedure();
222    procStore.insert(p1, null);
223    procStore.insert(p2, null);
224    procStore.rollWriterForTesting(); // generates first log with p1 + p2
225    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
226    procStore.update(p2);
227    procStore.rollWriterForTesting(); // generates second log with p2
228    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
229    procStore.update(p2);
230    procStore.rollWriterForTesting(); // generates third log with p2
231    procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log.
232    assertEquals(4, procStore.getActiveLogs().size());
233    procStore.update(p1);
234    procStore.rollWriterForTesting(); // generates fourth log with p1
235    procStore.removeInactiveLogsForTesting(); // Should remove first two logs.
236    assertEquals(3, procStore.getActiveLogs().size());
237    assertFalse(procStore.getActiveLogs().contains(log1));
238    assertFalse(procStore.getActiveLogs().contains(log2));
239  }
240
241  @Test
242  public void testWalCleanerWithEmptyRolls() throws Exception {
243    final Procedure<?>[] procs = new Procedure[3];
244    for (int i = 0; i < procs.length; ++i) {
245      procs[i] = new TestSequentialProcedure();
246      procStore.insert(procs[i], null);
247    }
248    assertEquals(1, procStore.getActiveLogs().size());
249    procStore.rollWriterForTesting();
250    assertEquals(2, procStore.getActiveLogs().size());
251    procStore.rollWriterForTesting();
252    assertEquals(3, procStore.getActiveLogs().size());
253
254    for (int i = 0; i < procs.length; ++i) {
255      procStore.update(procs[i]);
256      procStore.rollWriterForTesting();
257      procStore.rollWriterForTesting();
258      if (i < (procs.length - 1)) {
259        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
260      }
261    }
262    assertEquals(7, procStore.getActiveLogs().size());
263
264    for (int i = 0; i < procs.length; ++i) {
265      procStore.delete(procs[i].getProcId());
266      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
267    }
268    assertEquals(1, procStore.getActiveLogs().size());
269  }
270
271  @Test
272  public void testEmptyLogLoad() throws Exception {
273    LoadCounter loader = new LoadCounter();
274    storeRestart(loader);
275    assertEquals(0, loader.getMaxProcId());
276    assertEquals(0, loader.getLoadedCount());
277    assertEquals(0, loader.getCorruptedCount());
278  }
279
280  @Test
281  public void testLoad() throws Exception {
282    Set<Long> procIds = new HashSet<>();
283
284    // Insert something in the log
285    Procedure<?> proc1 = new TestSequentialProcedure();
286    procIds.add(proc1.getProcId());
287    procStore.insert(proc1, null);
288
289    Procedure<?> proc2 = new TestSequentialProcedure();
290    Procedure<?>[] child2 = new Procedure[2];
291    child2[0] = new TestSequentialProcedure();
292    child2[1] = new TestSequentialProcedure();
293
294    procIds.add(proc2.getProcId());
295    procIds.add(child2[0].getProcId());
296    procIds.add(child2[1].getProcId());
297    procStore.insert(proc2, child2);
298
299    // Verify that everything is there
300    verifyProcIdsOnRestart(procIds);
301
302    // Update and delete something
303    procStore.update(proc1);
304    procStore.update(child2[1]);
305    procStore.delete(child2[1].getProcId());
306    procIds.remove(child2[1].getProcId());
307
308    // Verify that everything is there
309    verifyProcIdsOnRestart(procIds);
310
311    // Remove 4 byte from the trailers
312    procStore.stop(false);
313    FileStatus[] logs = fs.listStatus(logDir);
314    assertEquals(3, logs.length);
315    for (int i = 0; i < logs.length; ++i) {
316      corruptLog(logs[i], 4);
317    }
318    verifyProcIdsOnRestart(procIds);
319  }
320
321  @Test
322  public void testNoTrailerDoubleRestart() throws Exception {
323    // log-0001: proc 0, 1 and 2 are inserted
324    Procedure<?> proc0 = new TestSequentialProcedure();
325    procStore.insert(proc0, null);
326    Procedure<?> proc1 = new TestSequentialProcedure();
327    procStore.insert(proc1, null);
328    Procedure<?> proc2 = new TestSequentialProcedure();
329    procStore.insert(proc2, null);
330    procStore.rollWriterForTesting();
331
332    // log-0002: proc 1 deleted
333    procStore.delete(proc1.getProcId());
334    procStore.rollWriterForTesting();
335
336    // log-0003: proc 2 is update
337    procStore.update(proc2);
338    procStore.rollWriterForTesting();
339
340    // log-0004: proc 2 deleted
341    procStore.delete(proc2.getProcId());
342
343    // stop the store and remove the trailer
344    procStore.stop(false);
345    FileStatus[] logs = fs.listStatus(logDir);
346    assertEquals(4, logs.length);
347    for (int i = 0; i < logs.length; ++i) {
348      corruptLog(logs[i], 4);
349    }
350
351    // Test Load 1
352    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
353    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
354    LoadCounter loader = new LoadCounter();
355    storeRestart(loader);
356    assertEquals(1, loader.getLoadedCount());
357    assertEquals(0, loader.getCorruptedCount());
358
359    // Test Load 2
360    assertEquals(5, fs.listStatus(logDir).length);
361    loader = new LoadCounter();
362    storeRestart(loader);
363    assertEquals(1, loader.getLoadedCount());
364    assertEquals(0, loader.getCorruptedCount());
365
366    // remove proc-0
367    procStore.delete(proc0.getProcId());
368    procStore.periodicRollForTesting();
369    assertEquals(1, fs.listStatus(logDir).length);
370    storeRestart(loader);
371  }
372
373  @Test
374  public void testProcIdHoles() throws Exception {
375    // Insert
376    for (int i = 0; i < 100; i += 2) {
377      procStore.insert(new TestProcedure(i), null);
378      if (i > 0 && (i % 10) == 0) {
379        LoadCounter loader = new LoadCounter();
380        storeRestart(loader);
381        assertEquals(0, loader.getCorruptedCount());
382        assertEquals((i / 2) + 1, loader.getLoadedCount());
383      }
384    }
385    assertEquals(10, procStore.getActiveLogs().size());
386
387    // Delete
388    for (int i = 0; i < 100; i += 2) {
389      procStore.delete(i);
390    }
391    assertEquals(1, procStore.getActiveLogs().size());
392
393    LoadCounter loader = new LoadCounter();
394    storeRestart(loader);
395    assertEquals(0, loader.getLoadedCount());
396    assertEquals(0, loader.getCorruptedCount());
397  }
398
399  @Test
400  public void testCorruptedTrailer() throws Exception {
401    // Insert something
402    for (int i = 0; i < 100; ++i) {
403      procStore.insert(new TestSequentialProcedure(), null);
404    }
405
406    // Stop the store
407    procStore.stop(false);
408
409    // Remove 4 byte from the trailer
410    FileStatus[] logs = fs.listStatus(logDir);
411    assertEquals(1, logs.length);
412    corruptLog(logs[0], 4);
413
414    LoadCounter loader = new LoadCounter();
415    storeRestart(loader);
416    assertEquals(100, loader.getLoadedCount());
417    assertEquals(0, loader.getCorruptedCount());
418  }
419
420  private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
421    final int[] updatedProcs, final int[] nonUpdatedProcs) {
422    for (int index : updatedProcs) {
423      long procId = procs[index].getProcId();
424      assertTrue("Procedure id : " + procId, tracker.isModified(procId));
425    }
426    for (int index : nonUpdatedProcs) {
427      long procId = procs[index].getProcId();
428      assertFalse("Procedure id : " + procId, tracker.isModified(procId));
429    }
430  }
431
432  private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
433    final int[] deletedProcs, final int[] nonDeletedProcs) {
434    for (int index : deletedProcs) {
435      long procId = procs[index].getProcId();
436      assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.YES,
437        tracker.isDeleted(procId));
438    }
439    for (int index : nonDeletedProcs) {
440      long procId = procs[index].getProcId();
441      assertEquals("Procedure id : " + procId, ProcedureStoreTracker.DeleteState.NO,
442        tracker.isDeleted(procId));
443    }
444  }
445
446  @Test
447  public void testCorruptedTrailersRebuild() throws Exception {
448    final Procedure<?>[] procs = new Procedure[6];
449    for (int i = 0; i < procs.length; ++i) {
450      procs[i] = new TestSequentialProcedure();
451    }
452    // Log State (I=insert, U=updated, D=delete)
453    // | log 1 | log 2 | log 3 |
454    // 0 | I, D | | |
455    // 1 | I | | |
456    // 2 | I | D | |
457    // 3 | I | U | |
458    // 4 | | I | D |
459    // 5 | | | I |
460    procStore.insert(procs[0], null);
461    procStore.insert(procs[1], null);
462    procStore.insert(procs[2], null);
463    procStore.insert(procs[3], null);
464    procStore.delete(procs[0].getProcId());
465    procStore.rollWriterForTesting();
466    procStore.delete(procs[2].getProcId());
467    procStore.update(procs[3]);
468    procStore.insert(procs[4], null);
469    procStore.rollWriterForTesting();
470    procStore.delete(procs[4].getProcId());
471    procStore.insert(procs[5], null);
472
473    // Stop the store
474    procStore.stop(false);
475
476    // Remove 4 byte from the trailers
477    final FileStatus[] logs = fs.listStatus(logDir);
478    assertEquals(3, logs.length);
479    for (int i = 0; i < logs.length; ++i) {
480      corruptLog(logs[i], 4);
481    }
482
483    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
484    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
485    final LoadCounter loader = new LoadCounter();
486    storeRestart(loader);
487    assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
488    assertEquals(0, loader.getCorruptedCount());
489
490    // Check the Trackers
491    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
492    LOG.info("WALs " + walFiles);
493    assertEquals(4, walFiles.size());
494    LOG.info("Checking wal " + walFiles.get(0));
495    assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 },
496      new int[] { 4, 5 });
497    LOG.info("Checking wal " + walFiles.get(1));
498    assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 },
499      new int[] { 0, 1, 5 });
500    LOG.info("Checking wal " + walFiles.get(2));
501    assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 },
502      new int[] { 0, 1, 2, 3 });
503    LOG.info("Checking global tracker ");
504    assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 });
505  }
506
507  @Test
508  public void testCorruptedEntries() throws Exception {
509    // Insert something
510    for (int i = 0; i < 100; ++i) {
511      procStore.insert(new TestSequentialProcedure(), null);
512    }
513
514    // Stop the store
515    procStore.stop(false);
516
517    // Remove some byte from the log
518    // (enough to cut the trailer and corrupt some entries)
519    FileStatus[] logs = fs.listStatus(logDir);
520    assertEquals(1, logs.length);
521    corruptLog(logs[0], 1823);
522
523    LoadCounter loader = new LoadCounter();
524    storeRestart(loader);
525    assertTrue(procStore.getCorruptedLogs() != null);
526    assertEquals(1, procStore.getCorruptedLogs().size());
527    assertEquals(87, loader.getLoadedCount());
528    assertEquals(0, loader.getCorruptedCount());
529  }
530
531  @Test
532  public void testCorruptedProcedures() throws Exception {
533    // Insert root-procedures
534    TestProcedure[] rootProcs = new TestProcedure[10];
535    for (int i = 1; i <= rootProcs.length; i++) {
536      rootProcs[i - 1] = new TestProcedure(i, 0);
537      procStore.insert(rootProcs[i - 1], null);
538      rootProcs[i - 1].addStackId(0);
539      procStore.update(rootProcs[i - 1]);
540    }
541    // insert root-child txn
542    procStore.rollWriterForTesting();
543    for (int i = 1; i <= rootProcs.length; i++) {
544      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
545      rootProcs[i - 1].addStackId(1);
546      procStore.insert(rootProcs[i - 1], new Procedure[] { b });
547    }
548    // insert child updates
549    procStore.rollWriterForTesting();
550    for (int i = 1; i <= rootProcs.length; i++) {
551      procStore.update(new TestProcedure(rootProcs.length + i, i));
552    }
553
554    // Stop the store
555    procStore.stop(false);
556
557    // the first log was removed,
558    // we have insert-txn and updates in the others so everything is fine
559    FileStatus[] logs = fs.listStatus(logDir);
560    assertEquals(Arrays.toString(logs), 2, logs.length);
561    Arrays.sort(logs, new Comparator<FileStatus>() {
562      @Override
563      public int compare(FileStatus o1, FileStatus o2) {
564        return o1.getPath().getName().compareTo(o2.getPath().getName());
565      }
566    });
567
568    LoadCounter loader = new LoadCounter();
569    storeRestart(loader);
570    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
571    assertEquals(0, loader.getCorruptedCount());
572
573    // Remove the second log, we have lost all the root/parent references
574    fs.delete(logs[0].getPath(), false);
575    loader.reset();
576    storeRestart(loader);
577    assertEquals(0, loader.getLoadedCount());
578    assertEquals(rootProcs.length, loader.getCorruptedCount());
579    for (Procedure<?> proc : loader.getCorrupted()) {
580      assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
581      assertTrue(proc.toString(),
582        proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
583    }
584  }
585
586  @Test
587  public void testRollAndRemove() throws IOException {
588    // Insert something in the log
589    Procedure<?> proc1 = new TestSequentialProcedure();
590    procStore.insert(proc1, null);
591
592    Procedure<?> proc2 = new TestSequentialProcedure();
593    procStore.insert(proc2, null);
594
595    // roll the log, now we have 2
596    procStore.rollWriterForTesting();
597    assertEquals(2, procStore.getActiveLogs().size());
598
599    // everything will be up to date in the second log
600    // so we can remove the first one
601    procStore.update(proc1);
602    procStore.update(proc2);
603    assertEquals(1, procStore.getActiveLogs().size());
604
605    // roll the log, now we have 2
606    procStore.rollWriterForTesting();
607    assertEquals(2, procStore.getActiveLogs().size());
608
609    // remove everything active
610    // so we can remove all the logs
611    procStore.delete(proc1.getProcId());
612    procStore.delete(proc2.getProcId());
613    assertEquals(1, procStore.getActiveLogs().size());
614  }
615
616  @Test
617  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
618    final TestProcedure[] procs = new TestProcedure[3];
619    for (int i = 0; i < procs.length; ++i) {
620      procs[i] = new TestProcedure(i + 1, 0);
621      procStore.insert(procs[i], null);
622    }
623    procStore.rollWriterForTesting();
624    for (int i = 0; i < procs.length; ++i) {
625      procStore.update(procs[i]);
626      procStore.rollWriterForTesting();
627    }
628    procStore.stop(false);
629
630    FileStatus[] status = fs.listStatus(logDir);
631    assertEquals(procs.length + 1, status.length);
632
633    // simulate another active master removing the wals
634    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() {
635      private int count = 0;
636
637      @Override
638      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
639        if (++count <= 2) {
640          fs.delete(path, false);
641          LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
642          throw new FileNotFoundException("test file not found " + path);
643        }
644        LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
645      }
646    });
647
648    final LoadCounter loader = new LoadCounter();
649    procStore.start(PROCEDURE_STORE_SLOTS);
650    procStore.recoverLease();
651    procStore.load(loader);
652    assertEquals(procs.length, loader.getMaxProcId());
653    assertEquals(1, loader.getRunnableCount());
654    assertEquals(0, loader.getCompletedCount());
655    assertEquals(0, loader.getCorruptedCount());
656  }
657
658  @Test
659  public void testLogFileAlreadyExists() throws IOException {
660    final boolean[] tested = { false };
661    WALProcedureStore mStore = Mockito.spy(procStore);
662
663    Answer<Boolean> ans = new Answer<Boolean>() {
664      @Override
665      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
666        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
667        switch ((int) logId) {
668          case 2:
669            // Create a file so that real rollWriter() runs into file exists condition
670            Path logFilePath = mStore.getLogFilePath(logId);
671            mStore.getFileSystem().create(logFilePath);
672            break;
673          case 3:
674            // Success only when we retry with logId 3
675            tested[0] = true;
676          default:
677            break;
678        }
679        return (Boolean) invocationOnMock.callRealMethod();
680      }
681    };
682
683    // First time Store has one log file, next id will be 2
684    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
685    // next time its 3
686    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
687
688    mStore.recoverLease();
689    assertTrue(tested[0]);
690  }
691
692  @Test
693  public void testLoadChildren() throws Exception {
694    TestProcedure a = new TestProcedure(1, 0);
695    TestProcedure b = new TestProcedure(2, 1);
696    TestProcedure c = new TestProcedure(3, 1);
697
698    // INIT
699    procStore.insert(a, null);
700
701    // Run A first step
702    a.addStackId(0);
703    procStore.update(a);
704
705    // Run A second step
706    a.addStackId(1);
707    procStore.insert(a, new Procedure[] { b, c });
708
709    // Run B first step
710    b.addStackId(2);
711    procStore.update(b);
712
713    // Run C first and last step
714    c.addStackId(3);
715    procStore.update(c);
716
717    // Run B second setp
718    b.addStackId(4);
719    procStore.update(b);
720
721    // back to A
722    a.addStackId(5);
723    a.setSuccessState();
724    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
725    restartAndAssert(3, 0, 1, 0);
726  }
727
728  @Test
729  public void testBatchDelete() throws Exception {
730    for (int i = 1; i < 10; ++i) {
731      procStore.insert(new TestProcedure(i), null);
732    }
733
734    // delete nothing
735    long[] toDelete = new long[] { 1, 2, 3, 4 };
736    procStore.delete(toDelete, 2, 0);
737    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
738    for (int i = 1; i < 10; ++i) {
739      assertEquals(true, loader.isRunnable(i));
740    }
741
742    // delete the full "toDelete" array (2, 4, 6, 8)
743    toDelete = new long[] { 2, 4, 6, 8 };
744    procStore.delete(toDelete, 0, toDelete.length);
745    loader = restartAndAssert(9, 5, 0, 0);
746    for (int i = 1; i < 10; ++i) {
747      assertEquals(i % 2 != 0, loader.isRunnable(i));
748    }
749
750    // delete a slice of "toDelete" (1, 3)
751    toDelete = new long[] { 5, 7, 1, 3, 9 };
752    procStore.delete(toDelete, 2, 2);
753    loader = restartAndAssert(9, 3, 0, 0);
754    for (int i = 1; i < 10; ++i) {
755      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
756    }
757
758    // delete a single item (5)
759    toDelete = new long[] { 5 };
760    procStore.delete(toDelete, 0, 1);
761    loader = restartAndAssert(9, 2, 0, 0);
762    for (int i = 1; i < 10; ++i) {
763      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
764    }
765
766    // delete remaining using a slice of "toDelete" (7, 9)
767    toDelete = new long[] { 0, 7, 9 };
768    procStore.delete(toDelete, 1, 2);
769    loader = restartAndAssert(0, 0, 0, 0);
770    for (int i = 1; i < 10; ++i) {
771      assertEquals(false, loader.isRunnable(i));
772    }
773  }
774
775  @Test
776  public void testBatchInsert() throws Exception {
777    final int count = 10;
778    final TestProcedure[] procs = new TestProcedure[count];
779    for (int i = 0; i < procs.length; ++i) {
780      procs[i] = new TestProcedure(i + 1);
781    }
782    procStore.insert(procs);
783    restartAndAssert(count, count, 0, 0);
784
785    for (int i = 0; i < procs.length; ++i) {
786      final long procId = procs[i].getProcId();
787      procStore.delete(procId);
788      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
789    }
790    procStore.removeInactiveLogsForTesting();
791    assertEquals("WALs=" + procStore.getActiveLogs(), 1, procStore.getActiveLogs().size());
792  }
793
794  @Test
795  public void testWALDirAndWALArchiveDir() throws IOException {
796    Configuration conf = htu.getConfiguration();
797    procStore = createWALProcedureStore(conf);
798    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
799  }
800
801  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
802    return new WALProcedureStore(conf, new LeaseRecovery() {
803      @Override
804      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
805        // no-op
806      }
807    });
808  }
809
810  private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount,
811    int corruptedCount) throws Exception {
812    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount,
813      completedCount, corruptedCount);
814  }
815
816  private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException {
817    assertTrue(logFile.getLen() > dropBytes);
818    LOG.debug(
819      "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes);
820    Path tmpPath = new Path(testDir, "corrupted.log");
821    InputStream in = fs.open(logFile.getPath());
822    OutputStream out = fs.create(tmpPath);
823    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
824    if (!fs.rename(tmpPath, logFile.getPath())) {
825      throw new IOException("Unable to rename");
826    }
827  }
828
829  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
830    LOG.debug("expected: " + procIds);
831    LoadCounter loader = new LoadCounter();
832    storeRestart(loader);
833    assertEquals(procIds.size(), loader.getLoadedCount());
834    assertEquals(0, loader.getCorruptedCount());
835  }
836
837  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
838    private static long seqid = 0;
839
840    public TestSequentialProcedure() {
841      setProcId(++seqid);
842    }
843
844    @Override
845    protected Procedure<Void>[] execute(Void env) {
846      return null;
847    }
848
849    @Override
850    protected void rollback(Void env) {
851    }
852
853    @Override
854    protected boolean abort(Void env) {
855      return false;
856    }
857
858    @Override
859    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
860      long procId = getProcId();
861      if (procId % 2 == 0) {
862        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
863        serializer.serialize(builder.build());
864      }
865    }
866
867    @Override
868    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
869      long procId = getProcId();
870      if (procId % 2 == 0) {
871        Int64Value value = serializer.deserialize(Int64Value.class);
872        assertEquals(procId, value.getValue());
873      }
874    }
875  }
876}