001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Comparator;
032import java.util.HashSet;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.procedure2.Procedure;
042import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
046import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
047import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.apache.hadoop.io.IOUtils;
052import org.junit.jupiter.api.AfterEach;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056import org.mockito.Mockito;
057import org.mockito.invocation.InvocationOnMock;
058import org.mockito.stubbing.Answer;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
063
064@Tag(MasterTests.TAG)
065@Tag(SmallTests.TAG)
066public class TestWALProcedureStore {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
069
070  private static final int PROCEDURE_STORE_SLOTS = 1;
071
072  private WALProcedureStore procStore;
073
074  private final HBaseCommonTestingUtil htu = new HBaseCommonTestingUtil();
075  private FileSystem fs;
076  private Path testDir;
077  private Path logDir;
078
079  private void setupConfig(final Configuration conf) {
080    conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true);
081  }
082
083  @BeforeEach
084  public void setUp() throws IOException {
085    testDir = htu.getDataTestDir();
086    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
087    fs = testDir.getFileSystem(htu.getConfiguration());
088    htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
089    assertTrue(testDir.depth() > 1);
090
091    TestSequentialProcedure.seqId.set(0);
092    setupConfig(htu.getConfiguration());
093    logDir = new Path(testDir, "proc-logs");
094    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
095    procStore.start(PROCEDURE_STORE_SLOTS);
096    procStore.recoverLease();
097    procStore.load(new LoadCounter());
098  }
099
100  @AfterEach
101  public void tearDown() throws IOException {
102    procStore.stop(false);
103    fs.delete(logDir, true);
104  }
105
106  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
107    ProcedureTestingUtility.storeRestart(procStore, loader);
108  }
109
110  @Test
111  public void testEmptyRoll() throws Exception {
112    for (int i = 0; i < 10; ++i) {
113      procStore.periodicRollForTesting();
114    }
115    assertEquals(1, procStore.getActiveLogs().size());
116    FileStatus[] status = fs.listStatus(logDir);
117    assertEquals(1, status.length);
118  }
119
120  @Test
121  public void testRestartWithoutData() throws Exception {
122    for (int i = 0; i < 10; ++i) {
123      final LoadCounter loader = new LoadCounter();
124      storeRestart(loader);
125    }
126    LOG.info("ACTIVE WALs " + procStore.getActiveLogs());
127    assertEquals(1, procStore.getActiveLogs().size());
128    FileStatus[] status = fs.listStatus(logDir);
129    assertEquals(1, status.length);
130  }
131
132  /**
133   * Tests that tracker for all old logs are loaded back after procedure store is restarted.
134   */
135  @Test
136  public void trackersLoadedForAllOldLogs() throws Exception {
137    for (int i = 0; i <= 20; ++i) {
138      procStore.insert(new TestProcedure(i), null);
139      if (i > 0 && (i % 5) == 0) {
140        LoadCounter loader = new LoadCounter();
141        storeRestart(loader);
142      }
143    }
144    assertEquals(5, procStore.getActiveLogs().size());
145    for (int i = 0; i < procStore.getActiveLogs().size() - 1; ++i) {
146      ProcedureStoreTracker tracker = procStore.getActiveLogs().get(i).getTracker();
147      assertTrue(tracker != null && !tracker.isEmpty());
148    }
149  }
150
151  @Test
152  public void testWalCleanerSequentialClean() throws Exception {
153    final Procedure<?>[] procs = new Procedure[5];
154    ArrayList<ProcedureWALFile> logs = null;
155
156    // Insert procedures and roll wal after every insert.
157    for (int i = 0; i < procs.length; i++) {
158      procs[i] = new TestSequentialProcedure();
159      procStore.insert(procs[i], null);
160      procStore.rollWriterForTesting();
161      logs = procStore.getActiveLogs();
162      assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal.
163    }
164
165    // Delete procedures in sequential order make sure that only the corresponding wal is deleted
166    // from logs list.
167    final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 };
168    for (int i = 0; i < deleteOrder.length; i++) {
169      procStore.delete(procs[deleteOrder[i]].getProcId());
170      procStore.removeInactiveLogsForTesting();
171      assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i])),
172        logs.get(deleteOrder[i]).toString());
173      assertEquals(procStore.getActiveLogs().size(), procs.length - i);
174    }
175  }
176
177  // Test that wal cleaner doesn't create holes in wal files list i.e. it only deletes files if
178  // they are in the starting of the list.
179  @Test
180  public void testWalCleanerNoHoles() throws Exception {
181    final Procedure<?>[] procs = new Procedure[5];
182    ArrayList<ProcedureWALFile> logs = null;
183    // Insert procedures and roll wal after every insert.
184    for (int i = 0; i < procs.length; i++) {
185      procs[i] = new TestSequentialProcedure();
186      procStore.insert(procs[i], null);
187      procStore.rollWriterForTesting();
188      logs = procStore.getActiveLogs();
189      assertEquals(i + 2, logs.size()); // Extra 1 for current ongoing wal.
190    }
191
192    for (int i = 1; i < procs.length; i++) {
193      procStore.delete(procs[i].getProcId());
194    }
195    assertEquals(procs.length + 1, procStore.getActiveLogs().size());
196    procStore.delete(procs[0].getProcId());
197    assertEquals(1, procStore.getActiveLogs().size());
198  }
199
200  @Test
201  public void testWalCleanerUpdates() throws Exception {
202    TestSequentialProcedure p1 = new TestSequentialProcedure();
203    TestSequentialProcedure p2 = new TestSequentialProcedure();
204    procStore.insert(p1, null);
205    procStore.insert(p2, null);
206    procStore.rollWriterForTesting();
207    ProcedureWALFile firstLog = procStore.getActiveLogs().get(0);
208    procStore.update(p1);
209    procStore.rollWriterForTesting();
210    procStore.update(p2);
211    procStore.rollWriterForTesting();
212    procStore.removeInactiveLogsForTesting();
213    assertFalse(procStore.getActiveLogs().contains(firstLog));
214  }
215
216  @Test
217  public void testWalCleanerUpdatesDontLeaveHoles() throws Exception {
218    TestSequentialProcedure p1 = new TestSequentialProcedure();
219    TestSequentialProcedure p2 = new TestSequentialProcedure();
220    procStore.insert(p1, null);
221    procStore.insert(p2, null);
222    procStore.rollWriterForTesting(); // generates first log with p1 + p2
223    ProcedureWALFile log1 = procStore.getActiveLogs().get(0);
224    procStore.update(p2);
225    procStore.rollWriterForTesting(); // generates second log with p2
226    ProcedureWALFile log2 = procStore.getActiveLogs().get(1);
227    procStore.update(p2);
228    procStore.rollWriterForTesting(); // generates third log with p2
229    procStore.removeInactiveLogsForTesting(); // Shouldn't remove 2nd log.
230    assertEquals(4, procStore.getActiveLogs().size());
231    procStore.update(p1);
232    procStore.rollWriterForTesting(); // generates fourth log with p1
233    procStore.removeInactiveLogsForTesting(); // Should remove first two logs.
234    assertEquals(3, procStore.getActiveLogs().size());
235    assertFalse(procStore.getActiveLogs().contains(log1));
236    assertFalse(procStore.getActiveLogs().contains(log2));
237  }
238
239  @Test
240  public void testWalCleanerWithEmptyRolls() throws Exception {
241    final Procedure<?>[] procs = new Procedure[3];
242    for (int i = 0; i < procs.length; ++i) {
243      procs[i] = new TestSequentialProcedure();
244      procStore.insert(procs[i], null);
245    }
246    assertEquals(1, procStore.getActiveLogs().size());
247    procStore.rollWriterForTesting();
248    assertEquals(2, procStore.getActiveLogs().size());
249    procStore.rollWriterForTesting();
250    assertEquals(3, procStore.getActiveLogs().size());
251
252    for (int i = 0; i < procs.length; ++i) {
253      procStore.update(procs[i]);
254      procStore.rollWriterForTesting();
255      procStore.rollWriterForTesting();
256      if (i < (procs.length - 1)) {
257        assertEquals(3 + ((i + 1) * 2), procStore.getActiveLogs().size());
258      }
259    }
260    assertEquals(7, procStore.getActiveLogs().size());
261
262    for (int i = 0; i < procs.length; ++i) {
263      procStore.delete(procs[i].getProcId());
264      assertEquals(7 - ((i + 1) * 2), procStore.getActiveLogs().size());
265    }
266    assertEquals(1, procStore.getActiveLogs().size());
267  }
268
269  @Test
270  public void testEmptyLogLoad() throws Exception {
271    LoadCounter loader = new LoadCounter();
272    storeRestart(loader);
273    assertEquals(0, loader.getMaxProcId());
274    assertEquals(0, loader.getLoadedCount());
275    assertEquals(0, loader.getCorruptedCount());
276  }
277
278  @Test
279  public void testLoad() throws Exception {
280    Set<Long> procIds = new HashSet<>();
281
282    // Insert something in the log
283    Procedure<?> proc1 = new TestSequentialProcedure();
284    procIds.add(proc1.getProcId());
285    procStore.insert(proc1, null);
286
287    Procedure<?> proc2 = new TestSequentialProcedure();
288    Procedure<?>[] child2 = new Procedure[2];
289    child2[0] = new TestSequentialProcedure();
290    child2[1] = new TestSequentialProcedure();
291
292    procIds.add(proc2.getProcId());
293    procIds.add(child2[0].getProcId());
294    procIds.add(child2[1].getProcId());
295    procStore.insert(proc2, child2);
296
297    // Verify that everything is there
298    verifyProcIdsOnRestart(procIds);
299
300    // Update and delete something
301    procStore.update(proc1);
302    procStore.update(child2[1]);
303    procStore.delete(child2[1].getProcId());
304    procIds.remove(child2[1].getProcId());
305
306    // Verify that everything is there
307    verifyProcIdsOnRestart(procIds);
308
309    // Remove 4 byte from the trailers
310    procStore.stop(false);
311    FileStatus[] logs = fs.listStatus(logDir);
312    assertEquals(3, logs.length);
313    for (int i = 0; i < logs.length; ++i) {
314      corruptLog(logs[i], 4);
315    }
316    verifyProcIdsOnRestart(procIds);
317  }
318
319  @Test
320  public void testNoTrailerDoubleRestart() throws Exception {
321    // log-0001: proc 0, 1 and 2 are inserted
322    Procedure<?> proc0 = new TestSequentialProcedure();
323    procStore.insert(proc0, null);
324    Procedure<?> proc1 = new TestSequentialProcedure();
325    procStore.insert(proc1, null);
326    Procedure<?> proc2 = new TestSequentialProcedure();
327    procStore.insert(proc2, null);
328    procStore.rollWriterForTesting();
329
330    // log-0002: proc 1 deleted
331    procStore.delete(proc1.getProcId());
332    procStore.rollWriterForTesting();
333
334    // log-0003: proc 2 is update
335    procStore.update(proc2);
336    procStore.rollWriterForTesting();
337
338    // log-0004: proc 2 deleted
339    procStore.delete(proc2.getProcId());
340
341    // stop the store and remove the trailer
342    procStore.stop(false);
343    FileStatus[] logs = fs.listStatus(logDir);
344    assertEquals(4, logs.length);
345    for (int i = 0; i < logs.length; ++i) {
346      corruptLog(logs[i], 4);
347    }
348
349    // Test Load 1
350    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
351    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
352    LoadCounter loader = new LoadCounter();
353    storeRestart(loader);
354    assertEquals(1, loader.getLoadedCount());
355    assertEquals(0, loader.getCorruptedCount());
356
357    // Test Load 2
358    assertEquals(5, fs.listStatus(logDir).length);
359    loader = new LoadCounter();
360    storeRestart(loader);
361    assertEquals(1, loader.getLoadedCount());
362    assertEquals(0, loader.getCorruptedCount());
363
364    // remove proc-0
365    procStore.delete(proc0.getProcId());
366    procStore.periodicRollForTesting();
367    assertEquals(1, fs.listStatus(logDir).length);
368    storeRestart(loader);
369  }
370
371  @Test
372  public void testProcIdHoles() throws Exception {
373    // Insert
374    for (int i = 0; i < 100; i += 2) {
375      procStore.insert(new TestProcedure(i), null);
376      if (i > 0 && (i % 10) == 0) {
377        LoadCounter loader = new LoadCounter();
378        storeRestart(loader);
379        assertEquals(0, loader.getCorruptedCount());
380        assertEquals((i / 2) + 1, loader.getLoadedCount());
381      }
382    }
383    assertEquals(10, procStore.getActiveLogs().size());
384
385    // Delete
386    for (int i = 0; i < 100; i += 2) {
387      procStore.delete(i);
388    }
389    assertEquals(1, procStore.getActiveLogs().size());
390
391    LoadCounter loader = new LoadCounter();
392    storeRestart(loader);
393    assertEquals(0, loader.getLoadedCount());
394    assertEquals(0, loader.getCorruptedCount());
395  }
396
397  @Test
398  public void testCorruptedTrailer() throws Exception {
399    // Insert something
400    for (int i = 0; i < 100; ++i) {
401      procStore.insert(new TestSequentialProcedure(), null);
402    }
403
404    // Stop the store
405    procStore.stop(false);
406
407    // Remove 4 byte from the trailer
408    FileStatus[] logs = fs.listStatus(logDir);
409    assertEquals(1, logs.length);
410    corruptLog(logs[0], 4);
411
412    LoadCounter loader = new LoadCounter();
413    storeRestart(loader);
414    assertEquals(100, loader.getLoadedCount());
415    assertEquals(0, loader.getCorruptedCount());
416  }
417
418  private static void assertUpdated(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
419    final int[] updatedProcs, final int[] nonUpdatedProcs) {
420    for (int index : updatedProcs) {
421      long procId = procs[index].getProcId();
422      assertTrue(tracker.isModified(procId), "Procedure id : " + procId);
423    }
424    for (int index : nonUpdatedProcs) {
425      long procId = procs[index].getProcId();
426      assertFalse(tracker.isModified(procId), "Procedure id : " + procId);
427    }
428  }
429
430  private static void assertDeleted(final ProcedureStoreTracker tracker, final Procedure<?>[] procs,
431    final int[] deletedProcs, final int[] nonDeletedProcs) {
432    for (int index : deletedProcs) {
433      long procId = procs[index].getProcId();
434      assertEquals(ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId),
435        "Procedure id : " + procId);
436    }
437    for (int index : nonDeletedProcs) {
438      long procId = procs[index].getProcId();
439      assertEquals(ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId),
440        "Procedure id : " + procId);
441    }
442  }
443
444  @Test
445  public void testCorruptedTrailersRebuild() throws Exception {
446    final Procedure<?>[] procs = new Procedure[6];
447    for (int i = 0; i < procs.length; ++i) {
448      procs[i] = new TestSequentialProcedure();
449    }
450    // Log State (I=insert, U=updated, D=delete)
451    // | log 1 | log 2 | log 3 |
452    // 0 | I, D | | |
453    // 1 | I | | |
454    // 2 | I | D | |
455    // 3 | I | U | |
456    // 4 | | I | D |
457    // 5 | | | I |
458    procStore.insert(procs[0], null);
459    procStore.insert(procs[1], null);
460    procStore.insert(procs[2], null);
461    procStore.insert(procs[3], null);
462    procStore.delete(procs[0].getProcId());
463    procStore.rollWriterForTesting();
464    procStore.delete(procs[2].getProcId());
465    procStore.update(procs[3]);
466    procStore.insert(procs[4], null);
467    procStore.rollWriterForTesting();
468    procStore.delete(procs[4].getProcId());
469    procStore.insert(procs[5], null);
470
471    // Stop the store
472    procStore.stop(false);
473
474    // Remove 4 byte from the trailers
475    final FileStatus[] logs = fs.listStatus(logDir);
476    assertEquals(3, logs.length);
477    for (int i = 0; i < logs.length; ++i) {
478      corruptLog(logs[i], 4);
479    }
480
481    // Restart the store (avoid cleaning up the files, to check the rebuilded trackers)
482    htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false);
483    final LoadCounter loader = new LoadCounter();
484    storeRestart(loader);
485    assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5
486    assertEquals(0, loader.getCorruptedCount());
487
488    // Check the Trackers
489    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
490    LOG.info("WALs " + walFiles);
491    assertEquals(4, walFiles.size());
492    LOG.info("Checking wal " + walFiles.get(0));
493    assertUpdated(walFiles.get(0).getTracker(), procs, new int[] { 0, 1, 2, 3 },
494      new int[] { 4, 5 });
495    LOG.info("Checking wal " + walFiles.get(1));
496    assertUpdated(walFiles.get(1).getTracker(), procs, new int[] { 2, 3, 4 },
497      new int[] { 0, 1, 5 });
498    LOG.info("Checking wal " + walFiles.get(2));
499    assertUpdated(walFiles.get(2).getTracker(), procs, new int[] { 4, 5 },
500      new int[] { 0, 1, 2, 3 });
501    LOG.info("Checking global tracker ");
502    assertDeleted(procStore.getStoreTracker(), procs, new int[] { 0, 2, 4 }, new int[] { 1, 3, 5 });
503  }
504
505  @Test
506  public void testCorruptedEntries() throws Exception {
507    // Insert something
508    for (int i = 0; i < 100; ++i) {
509      procStore.insert(new TestSequentialProcedure(), null);
510    }
511
512    // Stop the store
513    procStore.stop(false);
514
515    // Remove some byte from the log
516    // (enough to cut the trailer and corrupt some entries)
517    FileStatus[] logs = fs.listStatus(logDir);
518    assertEquals(1, logs.length);
519    corruptLog(logs[0], 1823);
520
521    LoadCounter loader = new LoadCounter();
522    storeRestart(loader);
523    assertNotNull(procStore.getCorruptedLogs());
524    assertEquals(1, procStore.getCorruptedLogs().size());
525    assertEquals(87, loader.getLoadedCount());
526    assertEquals(0, loader.getCorruptedCount());
527  }
528
529  @Test
530  public void testCorruptedProcedures() throws Exception {
531    // Insert root-procedures
532    TestProcedure[] rootProcs = new TestProcedure[10];
533    for (int i = 1; i <= rootProcs.length; i++) {
534      rootProcs[i - 1] = new TestProcedure(i, 0);
535      procStore.insert(rootProcs[i - 1], null);
536      rootProcs[i - 1].addStackId(0);
537      procStore.update(rootProcs[i - 1]);
538    }
539    // insert root-child txn
540    procStore.rollWriterForTesting();
541    for (int i = 1; i <= rootProcs.length; i++) {
542      TestProcedure b = new TestProcedure(rootProcs.length + i, i);
543      rootProcs[i - 1].addStackId(1);
544      procStore.insert(rootProcs[i - 1], new Procedure[] { b });
545    }
546    // insert child updates
547    procStore.rollWriterForTesting();
548    for (int i = 1; i <= rootProcs.length; i++) {
549      procStore.update(new TestProcedure(rootProcs.length + i, i));
550    }
551
552    // Stop the store
553    procStore.stop(false);
554
555    // the first log was removed,
556    // we have insert-txn and updates in the others so everything is fine
557    FileStatus[] logs = fs.listStatus(logDir);
558    assertEquals(2, logs.length, Arrays.toString(logs));
559    Arrays.sort(logs, new Comparator<FileStatus>() {
560      @Override
561      public int compare(FileStatus o1, FileStatus o2) {
562        return o1.getPath().getName().compareTo(o2.getPath().getName());
563      }
564    });
565
566    LoadCounter loader = new LoadCounter();
567    storeRestart(loader);
568    assertEquals(rootProcs.length * 2, loader.getLoadedCount());
569    assertEquals(0, loader.getCorruptedCount());
570
571    // Remove the second log, we have lost all the root/parent references
572    fs.delete(logs[0].getPath(), false);
573    loader.reset();
574    storeRestart(loader);
575    assertEquals(0, loader.getLoadedCount());
576    assertEquals(rootProcs.length, loader.getCorruptedCount());
577    for (Procedure<?> proc : loader.getCorrupted()) {
578      assertTrue(proc.getParentProcId() <= rootProcs.length, proc.toString());
579      assertTrue(proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2),
580        proc.toString());
581    }
582  }
583
584  @Test
585  public void testRollAndRemove() throws IOException {
586    // Insert something in the log
587    Procedure<?> proc1 = new TestSequentialProcedure();
588    procStore.insert(proc1, null);
589
590    Procedure<?> proc2 = new TestSequentialProcedure();
591    procStore.insert(proc2, null);
592
593    // roll the log, now we have 2
594    procStore.rollWriterForTesting();
595    assertEquals(2, procStore.getActiveLogs().size());
596
597    // everything will be up to date in the second log
598    // so we can remove the first one
599    procStore.update(proc1);
600    procStore.update(proc2);
601    assertEquals(1, procStore.getActiveLogs().size());
602
603    // roll the log, now we have 2
604    procStore.rollWriterForTesting();
605    assertEquals(2, procStore.getActiveLogs().size());
606
607    // remove everything active
608    // so we can remove all the logs
609    procStore.delete(proc1.getProcId());
610    procStore.delete(proc2.getProcId());
611    assertEquals(1, procStore.getActiveLogs().size());
612  }
613
614  @Test
615  public void testFileNotFoundDuringLeaseRecovery() throws IOException {
616    final TestProcedure[] procs = new TestProcedure[3];
617    for (int i = 0; i < procs.length; ++i) {
618      procs[i] = new TestProcedure(i + 1, 0);
619      procStore.insert(procs[i], null);
620    }
621    procStore.rollWriterForTesting();
622    for (int i = 0; i < procs.length; ++i) {
623      procStore.update(procs[i]);
624      procStore.rollWriterForTesting();
625    }
626    procStore.stop(false);
627
628    FileStatus[] status = fs.listStatus(logDir);
629    assertEquals(procs.length + 1, status.length);
630
631    // simulate another active master removing the wals
632    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null, new LeaseRecovery() {
633      private int count = 0;
634
635      @Override
636      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
637        if (++count <= 2) {
638          fs.delete(path, false);
639          LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
640          throw new FileNotFoundException("test file not found " + path);
641        }
642        LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
643      }
644    });
645
646    final LoadCounter loader = new LoadCounter();
647    procStore.start(PROCEDURE_STORE_SLOTS);
648    procStore.recoverLease();
649    procStore.load(loader);
650    assertEquals(procs.length, loader.getMaxProcId());
651    assertEquals(1, loader.getRunnableCount());
652    assertEquals(0, loader.getCompletedCount());
653    assertEquals(0, loader.getCorruptedCount());
654  }
655
656  @Test
657  public void testLogFileAlreadyExists() throws IOException {
658    final boolean[] tested = { false };
659    WALProcedureStore mStore = Mockito.spy(procStore);
660
661    Answer<Boolean> ans = new Answer<Boolean>() {
662      @Override
663      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
664        long logId = ((Long) invocationOnMock.getArgument(0)).longValue();
665        switch ((int) logId) {
666          case 2:
667            // Create a file so that real rollWriter() runs into file exists condition
668            Path logFilePath = mStore.getLogFilePath(logId);
669            mStore.getFileSystem().create(logFilePath);
670            break;
671          case 3:
672            // Success only when we retry with logId 3
673            tested[0] = true;
674          default:
675            break;
676        }
677        return (Boolean) invocationOnMock.callRealMethod();
678      }
679    };
680
681    // First time Store has one log file, next id will be 2
682    Mockito.doAnswer(ans).when(mStore).rollWriter(2);
683    // next time its 3
684    Mockito.doAnswer(ans).when(mStore).rollWriter(3);
685
686    mStore.recoverLease();
687    assertTrue(tested[0]);
688  }
689
690  @Test
691  public void testLoadChildren() throws Exception {
692    TestProcedure a = new TestProcedure(1, 0);
693    TestProcedure b = new TestProcedure(2, 1);
694    TestProcedure c = new TestProcedure(3, 1);
695
696    // INIT
697    procStore.insert(a, null);
698
699    // Run A first step
700    a.addStackId(0);
701    procStore.update(a);
702
703    // Run A second step
704    a.addStackId(1);
705    procStore.insert(a, new Procedure[] { b, c });
706
707    // Run B first step
708    b.addStackId(2);
709    procStore.update(b);
710
711    // Run C first and last step
712    c.addStackId(3);
713    procStore.update(c);
714
715    // Run B second setp
716    b.addStackId(4);
717    procStore.update(b);
718
719    // back to A
720    a.addStackId(5);
721    a.setSuccessState();
722    procStore.delete(a, new long[] { b.getProcId(), c.getProcId() });
723    restartAndAssert(3, 0, 1, 0);
724  }
725
726  @Test
727  public void testBatchDelete() throws Exception {
728    for (int i = 1; i < 10; ++i) {
729      procStore.insert(new TestProcedure(i), null);
730    }
731
732    // delete nothing
733    long[] toDelete = new long[] { 1, 2, 3, 4 };
734    procStore.delete(toDelete, 2, 0);
735    LoadCounter loader = restartAndAssert(9, 9, 0, 0);
736    for (int i = 1; i < 10; ++i) {
737      assertEquals(true, loader.isRunnable(i));
738    }
739
740    // delete the full "toDelete" array (2, 4, 6, 8)
741    toDelete = new long[] { 2, 4, 6, 8 };
742    procStore.delete(toDelete, 0, toDelete.length);
743    loader = restartAndAssert(9, 5, 0, 0);
744    for (int i = 1; i < 10; ++i) {
745      assertEquals(i % 2 != 0, loader.isRunnable(i));
746    }
747
748    // delete a slice of "toDelete" (1, 3)
749    toDelete = new long[] { 5, 7, 1, 3, 9 };
750    procStore.delete(toDelete, 2, 2);
751    loader = restartAndAssert(9, 3, 0, 0);
752    for (int i = 1; i < 10; ++i) {
753      assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
754    }
755
756    // delete a single item (5)
757    toDelete = new long[] { 5 };
758    procStore.delete(toDelete, 0, 1);
759    loader = restartAndAssert(9, 2, 0, 0);
760    for (int i = 1; i < 10; ++i) {
761      assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
762    }
763
764    // delete remaining using a slice of "toDelete" (7, 9)
765    toDelete = new long[] { 0, 7, 9 };
766    procStore.delete(toDelete, 1, 2);
767    loader = restartAndAssert(0, 0, 0, 0);
768    for (int i = 1; i < 10; ++i) {
769      assertEquals(false, loader.isRunnable(i));
770    }
771  }
772
773  @Test
774  public void testBatchInsert() throws Exception {
775    final int count = 10;
776    final TestProcedure[] procs = new TestProcedure[count];
777    for (int i = 0; i < procs.length; ++i) {
778      procs[i] = new TestProcedure(i + 1);
779    }
780    procStore.insert(procs);
781    restartAndAssert(count, count, 0, 0);
782
783    for (int i = 0; i < procs.length; ++i) {
784      final long procId = procs[i].getProcId();
785      procStore.delete(procId);
786      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
787    }
788    procStore.removeInactiveLogsForTesting();
789    assertEquals(1, procStore.getActiveLogs().size(), "WALs=" + procStore.getActiveLogs());
790  }
791
792  @Test
793  public void testWALDirAndWALArchiveDir() throws IOException {
794    Configuration conf = htu.getConfiguration();
795    procStore = createWALProcedureStore(conf);
796    assertEquals(procStore.getFileSystem(), procStore.getWalArchiveDir().getFileSystem(conf));
797  }
798
799  private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
800    return new WALProcedureStore(conf, new LeaseRecovery() {
801      @Override
802      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
803        // no-op
804      }
805    });
806  }
807
808  private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount,
809    int corruptedCount) throws Exception {
810    return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount,
811      completedCount, corruptedCount);
812  }
813
814  private void corruptLog(final FileStatus logFile, final long dropBytes) throws IOException {
815    assertTrue(logFile.getLen() > dropBytes);
816    LOG.debug(
817      "corrupt log " + logFile.getPath() + " size=" + logFile.getLen() + " drop=" + dropBytes);
818    Path tmpPath = new Path(testDir, "corrupted.log");
819    InputStream in = fs.open(logFile.getPath());
820    OutputStream out = fs.create(tmpPath);
821    IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
822    if (!fs.rename(tmpPath, logFile.getPath())) {
823      throw new IOException("Unable to rename");
824    }
825  }
826
827  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
828    LOG.debug("expected: " + procIds);
829    LoadCounter loader = new LoadCounter();
830    storeRestart(loader);
831    assertEquals(procIds.size(), loader.getLoadedCount());
832    assertEquals(0, loader.getCorruptedCount());
833  }
834
835  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
836
837    private static final AtomicLong seqId = new AtomicLong(0);
838
839    public TestSequentialProcedure() {
840      setProcId(seqId.incrementAndGet());
841    }
842
843    @Override
844    protected Procedure<Void>[] execute(Void env) {
845      return null;
846    }
847
848    @Override
849    protected void rollback(Void env) {
850    }
851
852    @Override
853    protected boolean abort(Void env) {
854      return false;
855    }
856
857    @Override
858    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
859      long procId = getProcId();
860      if (procId % 2 == 0) {
861        Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
862        serializer.serialize(builder.build());
863      }
864    }
865
866    @Override
867    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
868      long procId = getProcId();
869      if (procId % 2 == 0) {
870        Int64Value value = serializer.deserialize(Int64Value.class);
871        assertEquals(procId, value.getValue());
872      }
873    }
874  }
875}