001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.io.InputStream;
030import java.lang.reflect.Method;
031import java.net.BindException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.NavigableMap;
035import java.util.TreeMap;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.stream.Collectors;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.Coprocessor;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.codec.Codec;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
060import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
061import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
062import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
064import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
065import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WALFactory.Providers;
074import org.apache.hadoop.hdfs.DistributedFileSystem;
075import org.apache.hadoop.hdfs.MiniDFSCluster;
076import org.apache.hadoop.hdfs.protocol.HdfsConstants;
077import org.junit.After;
078import org.junit.AfterClass;
079import org.junit.Before;
080import org.junit.BeforeClass;
081import org.junit.ClassRule;
082import org.junit.Rule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.rules.TestName;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089/**
090 * WAL tests that can be reused across providers.
091 */
092@Category({ RegionServerTests.class, MediumTests.class })
093public class TestWALFactory {
094
095  @ClassRule
096  public static final HBaseClassTestRule CLASS_RULE =
097    HBaseClassTestRule.forClass(TestWALFactory.class);
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
100
101  protected static Configuration conf;
102  private static MiniDFSCluster cluster;
103  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
104  protected static Path hbaseDir;
105  protected static Path hbaseWALDir;
106
107  protected FileSystem fs;
108  protected Path dir;
109  protected WALFactory wals;
110  private ServerName currentServername;
111
112  @Rule
113  public final TestName currentTest = new TestName();
114
115  @Before
116  public void setUp() throws Exception {
117    fs = cluster.getFileSystem();
118    dir = new Path(hbaseDir, currentTest.getMethodName());
119    this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
120    wals = new WALFactory(conf, this.currentServername.toString());
121  }
122
123  @After
124  public void tearDown() throws Exception {
125    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
126    try {
127      wals.close();
128    } catch (IOException exception) {
129      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this"
130        + " may be the cause. Message: " + exception);
131      LOG.debug("Exception details for failure to close wal factory.", exception);
132    }
133    FileStatus[] entries = fs.listStatus(new Path("/"));
134    for (FileStatus dir : entries) {
135      fs.delete(dir.getPath(), true);
136    }
137  }
138
139  @BeforeClass
140  public static void setUpBeforeClass() throws Exception {
141    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
142    // Make block sizes small.
143    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
144    // needed for testAppendClose()
145    // quicker heartbeat interval for faster DN death notification
146    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
147    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
148    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
149
150    // faster failover with cluster.shutdown();fs.close() idiom
151    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
152    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
153    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
154    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
155    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
156    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
157      SampleRegionWALCoprocessor.class.getName());
158    TEST_UTIL.startMiniDFSCluster(3);
159
160    conf = TEST_UTIL.getConfiguration();
161    cluster = TEST_UTIL.getDFSCluster();
162
163    hbaseDir = TEST_UTIL.createRootDir();
164    hbaseWALDir = TEST_UTIL.createWALRootDir();
165  }
166
167  @AfterClass
168  public static void tearDownAfterClass() throws Exception {
169    TEST_UTIL.shutdownMiniCluster();
170  }
171
172  @Test
173  public void canCloseSingleton() throws IOException {
174    WALFactory.getInstance(conf).close();
175  }
176
177  /**
178   * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail. n
179   */
180  @Test
181  public void testSplit() throws IOException {
182    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
183    final byte[] rowName = tableName.getName();
184    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
185    final int howmany = 3;
186    RegionInfo[] infos = new RegionInfo[3];
187    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
188    fs.mkdirs(tableDataDir);
189    Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
190    fs.mkdirs(tabledir);
191    for (int i = 0; i < howmany; i++) {
192      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
193        .setEndKey(Bytes.toBytes("" + (i + 1))).build();
194      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
195      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
196      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
197    }
198    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
199    scopes.put(Bytes.toBytes("column"), 0);
200
201    // Add edits for three regions.
202    for (int ii = 0; ii < howmany; ii++) {
203      for (int i = 0; i < howmany; i++) {
204        final WAL log = wals.getWAL(infos[i]);
205        for (int j = 0; j < howmany; j++) {
206          WALEdit edit = new WALEdit();
207          byte[] family = Bytes.toBytes("column");
208          byte[] qualifier = Bytes.toBytes(Integer.toString(j));
209          byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
210          edit.add(
211            new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column));
212          LOG.info("Region " + i + ": " + edit);
213          WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
214            EnvironmentEdgeManager.currentTime(), mvcc, scopes);
215          log.appendData(infos[i], walKey, edit);
216          walKey.getWriteEntry();
217        }
218        log.sync();
219        log.rollWriter(true);
220      }
221    }
222    wals.shutdown();
223    // The below calculation of logDir relies on insider information... WALSplitter should be
224    // connected better
225    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
226    Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
227      this.currentServername.toString());
228    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
229    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
230    verifySplits(splits, howmany);
231  }
232
233  /**
234   * Test new HDFS-265 sync. n
235   */
236  @Test
237  public void Broken_testSync() throws Exception {
238    TableName tableName = TableName.valueOf(currentTest.getMethodName());
239    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
240    // First verify that using streams all works.
241    Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
242    FSDataOutputStream out = fs.create(p);
243    out.write(tableName.getName());
244    Method syncMethod = null;
245    try {
246      syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
247    } catch (NoSuchMethodException e) {
248      try {
249        syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
250      } catch (NoSuchMethodException ex) {
251        fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush().");
252      }
253    }
254    syncMethod.invoke(out, new Object[] {});
255    FSDataInputStream in = fs.open(p);
256    assertTrue(in.available() > 0);
257    byte[] buffer = new byte[1024];
258    int read = in.read(buffer);
259    assertEquals(tableName.getName().length, read);
260    out.close();
261    in.close();
262
263    final int total = 20;
264    WAL.Reader reader = null;
265
266    try {
267      RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
268      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
269      scopes.put(tableName.getName(), 0);
270      final WAL wal = wals.getWAL(info);
271
272      for (int i = 0; i < total; i++) {
273        WALEdit kvs = new WALEdit();
274        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
275        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
276          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
277      }
278      // Now call sync and try reading. Opening a Reader before you sync just
279      // gives you EOFE.
280      wal.sync();
281      // Open a Reader.
282      Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
283      reader = wals.createReader(fs, walPath);
284      int count = 0;
285      WAL.Entry entry = new WAL.Entry();
286      while ((entry = reader.next(entry)) != null)
287        count++;
288      assertEquals(total, count);
289      reader.close();
290      // Add test that checks to see that an open of a Reader works on a file
291      // that has had a sync done on it.
292      for (int i = 0; i < total; i++) {
293        WALEdit kvs = new WALEdit();
294        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
295        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
296          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
297      }
298      wal.sync();
299      reader = wals.createReader(fs, walPath);
300      count = 0;
301      while ((entry = reader.next(entry)) != null)
302        count++;
303      assertTrue(count >= total);
304      reader.close();
305      // If I sync, should see double the edits.
306      wal.sync();
307      reader = wals.createReader(fs, walPath);
308      count = 0;
309      while ((entry = reader.next(entry)) != null)
310        count++;
311      assertEquals(total * 2, count);
312      reader.close();
313      // Now do a test that ensures stuff works when we go over block boundary,
314      // especially that we return good length on file.
315      final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
316      for (int i = 0; i < total; i++) {
317        WALEdit kvs = new WALEdit();
318        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
319        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
320          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
321      }
322      // Now I should have written out lots of blocks. Sync then read.
323      wal.sync();
324      reader = wals.createReader(fs, walPath);
325      count = 0;
326      while ((entry = reader.next(entry)) != null)
327        count++;
328      assertEquals(total * 3, count);
329      reader.close();
330      // shutdown and ensure that Reader gets right length also.
331      wal.shutdown();
332      reader = wals.createReader(fs, walPath);
333      count = 0;
334      while ((entry = reader.next(entry)) != null)
335        count++;
336      assertEquals(total * 3, count);
337      reader.close();
338    } finally {
339      if (reader != null) reader.close();
340    }
341  }
342
343  private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
344    assertEquals(howmany * howmany, splits.size());
345    for (int i = 0; i < splits.size(); i++) {
346      LOG.info("Verifying=" + splits.get(i));
347      WAL.Reader reader = wals.createReader(fs, splits.get(i));
348      try {
349        int count = 0;
350        String previousRegion = null;
351        long seqno = -1;
352        WAL.Entry entry = new WAL.Entry();
353        while ((entry = reader.next(entry)) != null) {
354          WALKey key = entry.getKey();
355          String region = Bytes.toString(key.getEncodedRegionName());
356          // Assert that all edits are for same region.
357          if (previousRegion != null) {
358            assertEquals(previousRegion, region);
359          }
360          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
361          assertTrue(seqno < key.getSequenceId());
362          seqno = key.getSequenceId();
363          previousRegion = region;
364          count++;
365        }
366        assertEquals(howmany, count);
367      } finally {
368        reader.close();
369      }
370    }
371  }
372
373  /*
374   * We pass different values to recoverFileLease() so that different code paths are covered For
375   * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze
376   * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain
377   * pendingCreates)
378   */
379  @Test
380  public void testAppendClose() throws Exception {
381    TableName tableName = TableName.valueOf(currentTest.getMethodName());
382    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
383
384    WAL wal = wals.getWAL(regionInfo);
385    int total = 20;
386
387    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
388    scopes.put(tableName.getName(), 0);
389    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
390    for (int i = 0; i < total; i++) {
391      WALEdit kvs = new WALEdit();
392      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
393      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
394        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
395    }
396    // Now call sync to send the data to HDFS datanodes
397    wal.sync();
398    int namenodePort = cluster.getNameNodePort();
399    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
400
401    // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
402    try {
403      DistributedFileSystem dfs = cluster.getFileSystem();
404      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
405      TEST_UTIL.shutdownMiniDFSCluster();
406      try {
407        // wal.writer.close() will throw an exception,
408        // but still call this since it closes the LogSyncer thread first
409        wal.shutdown();
410      } catch (IOException e) {
411        LOG.info(e.toString(), e);
412      }
413      fs.close(); // closing FS last so DFSOutputStream can't call close
414      LOG.info("STOPPED first instance of the cluster");
415    } finally {
416      // Restart the cluster
417      while (cluster.isClusterUp()) {
418        LOG.error("Waiting for cluster to go down");
419        Thread.sleep(1000);
420      }
421      assertFalse(cluster.isClusterUp());
422      cluster = null;
423      for (int i = 0; i < 100; i++) {
424        try {
425          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
426          break;
427        } catch (BindException e) {
428          LOG.info("Sleeping.  BindException bringing up new cluster");
429          Threads.sleep(1000);
430        }
431      }
432      cluster.waitActive();
433      fs = cluster.getFileSystem();
434      LOG.info("STARTED second instance.");
435    }
436
437    // set the lease period to be 1 second so that the
438    // namenode triggers lease recovery upon append request
439    Method setLeasePeriod =
440      cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE });
441    setLeasePeriod.setAccessible(true);
442    setLeasePeriod.invoke(cluster, 1000L, 1000L);
443    try {
444      Thread.sleep(1000);
445    } catch (InterruptedException e) {
446      LOG.info(e.toString(), e);
447    }
448
449    // Now try recovering the log, like the HMaster would do
450    final FileSystem recoveredFs = fs;
451    final Configuration rlConf = conf;
452
453    class RecoverLogThread extends Thread {
454      public Exception exception = null;
455
456      @Override
457      public void run() {
458        try {
459          RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
460        } catch (IOException e) {
461          exception = e;
462        }
463      }
464    }
465
466    RecoverLogThread t = new RecoverLogThread();
467    t.start();
468    // Timeout after 60 sec. Without correct patches, would be an infinite loop
469    t.join(60 * 1000);
470    if (t.isAlive()) {
471      t.interrupt();
472      throw new Exception("Timed out waiting for WAL.recoverLog()");
473    }
474
475    if (t.exception != null) throw t.exception;
476
477    // Make sure you can read all the content
478    WAL.Reader reader = wals.createReader(fs, walPath);
479    int count = 0;
480    WAL.Entry entry = new WAL.Entry();
481    while (reader.next(entry) != null) {
482      count++;
483      assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
484    }
485    assertEquals(total, count);
486    reader.close();
487
488    // Reset the lease period
489    setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
490  }
491
492  /**
493   * Tests that we can write out an edit, close, and then read it back in again.
494   */
495  @Test
496  public void testEditAdd() throws IOException {
497    int colCount = 10;
498    TableDescriptor htd =
499      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
500        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
501    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
502    for (byte[] fam : htd.getColumnFamilyNames()) {
503      scopes.put(fam, 0);
504    }
505    byte[] row = Bytes.toBytes("row");
506    WAL.Reader reader = null;
507    try {
508      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
509
510      // Write columns named 1, 2, 3, etc. and then values of single byte
511      // 1, 2, 3...
512      long timestamp = EnvironmentEdgeManager.currentTime();
513      WALEdit cols = new WALEdit();
514      for (int i = 0; i < colCount; i++) {
515        cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
516          timestamp, new byte[] { (byte) (i + '0') }));
517      }
518      RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
519        .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
520      final WAL log = wals.getWAL(info);
521
522      final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
523        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
524      log.sync(txid);
525      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
526      log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
527      log.shutdown();
528      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
529      // Now open a reader on the log and assert append worked.
530      reader = wals.createReader(fs, filename);
531      // Above we added all columns on a single row so we only read one
532      // entry in the below... thats why we have '1'.
533      for (int i = 0; i < 1; i++) {
534        WAL.Entry entry = reader.next(null);
535        if (entry == null) break;
536        WALKey key = entry.getKey();
537        WALEdit val = entry.getEdit();
538        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
539        assertTrue(htd.getTableName().equals(key.getTableName()));
540        Cell cell = val.getCells().get(0);
541        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
542          cell.getRowLength()));
543        assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
544        System.out.println(key + " " + val);
545      }
546    } finally {
547      if (reader != null) {
548        reader.close();
549      }
550    }
551  }
552
553  @Test
554  public void testAppend() throws IOException {
555    int colCount = 10;
556    TableDescriptor htd =
557      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
558        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
559    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
560    for (byte[] fam : htd.getColumnFamilyNames()) {
561      scopes.put(fam, 0);
562    }
563    byte[] row = Bytes.toBytes("row");
564    WAL.Reader reader = null;
565    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
566    try {
567      // Write columns named 1, 2, 3, etc. and then values of single byte
568      // 1, 2, 3...
569      long timestamp = EnvironmentEdgeManager.currentTime();
570      WALEdit cols = new WALEdit();
571      for (int i = 0; i < colCount; i++) {
572        cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
573          timestamp, new byte[] { (byte) (i + '0') }));
574      }
575      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
576      final WAL log = wals.getWAL(hri);
577      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
578        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
579      log.sync(txid);
580      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
581      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
582      log.shutdown();
583      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
584      // Now open a reader on the log and assert append worked.
585      reader = wals.createReader(fs, filename);
586      WAL.Entry entry = reader.next();
587      assertEquals(colCount, entry.getEdit().size());
588      int idx = 0;
589      for (Cell val : entry.getEdit().getCells()) {
590        assertTrue(
591          Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()));
592        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
593        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
594          val.getRowLength()));
595        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
596        System.out.println(entry.getKey() + " " + val);
597        idx++;
598      }
599    } finally {
600      if (reader != null) {
601        reader.close();
602      }
603    }
604  }
605
606  /**
607   * Test that we can visit entries before they are appended n
608   */
609  @Test
610  public void testVisitors() throws Exception {
611    final int COL_COUNT = 10;
612    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
613    final byte[] row = Bytes.toBytes("row");
614    final DumbWALActionsListener visitor = new DumbWALActionsListener();
615    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
616    long timestamp = EnvironmentEdgeManager.currentTime();
617    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
618    scopes.put(Bytes.toBytes("column"), 0);
619
620    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
621    final WAL log = wals.getWAL(hri);
622    log.registerWALActionsListener(visitor);
623    for (int i = 0; i < COL_COUNT; i++) {
624      WALEdit cols = new WALEdit();
625      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
626        timestamp, new byte[] { (byte) (i + '0') }));
627      log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
628        EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
629    }
630    log.sync();
631    assertEquals(COL_COUNT, visitor.increments);
632    log.unregisterWALActionsListener(visitor);
633    WALEdit cols = new WALEdit();
634    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)),
635      timestamp, new byte[] { (byte) (11 + '0') }));
636    log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
637      EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
638    log.sync();
639    assertEquals(COL_COUNT, visitor.increments);
640  }
641
642  /**
643   * A loaded WAL coprocessor won't break existing WAL test cases.
644   */
645  @Test
646  public void testWALCoprocessorLoaded() throws Exception {
647    // test to see whether the coprocessor is loaded or not.
648    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
649    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
650    assertNotNull(c);
651  }
652
653  static class DumbWALActionsListener implements WALActionsListener {
654    int increments = 0;
655
656    @Override
657    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
658      increments++;
659    }
660  }
661
662  @Test
663  public void testWALProviders() throws IOException {
664    Configuration conf = new Configuration();
665    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
666    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
667  }
668
669  @Test
670  public void testOnlySetWALProvider() throws IOException {
671    Configuration conf = new Configuration();
672    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
673    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
674
675    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass());
676    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
677  }
678
679  @Test
680  public void testOnlySetMetaWALProvider() throws IOException {
681    Configuration conf = new Configuration();
682    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
683    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
684
685    assertEquals(WALFactory.Providers.defaultProvider.clazz,
686      walFactory.getWALProvider().getClass());
687    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
688  }
689
690  @Test
691  public void testDefaultProvider() throws IOException {
692    final Configuration conf = new Configuration();
693    // AsyncFSWal is the default, we should be able to request any WAL.
694    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString());
695    Class<? extends WALProvider> fshLogProvider =
696      normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
697    assertEquals(Providers.filesystem.clazz, fshLogProvider);
698
699    // Imagine a world where MultiWAL is the default
700    final WALFactory customizedWalFactory =
701      new WALFactory(conf, this.currentServername.toString()) {
702        @Override
703        Providers getDefaultProvider() {
704          return Providers.multiwal;
705        }
706      };
707    // If we don't specify a WALProvider, we should get the default implementation.
708    Class<? extends WALProvider> multiwalProviderClass =
709      customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name());
710    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
711  }
712
713  @Test
714  public void testCustomProvider() throws IOException {
715    final Configuration config = new Configuration();
716    config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
717    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
718    Class<? extends WALProvider> walProvider =
719      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
720    assertEquals(IOTestProvider.class, walProvider);
721    WALProvider metaWALProvider = walFactory.getMetaProvider();
722    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
723  }
724
725  @Test
726  public void testCustomMetaProvider() throws IOException {
727    final Configuration config = new Configuration();
728    config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
729    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
730    Class<? extends WALProvider> walProvider =
731      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
732    assertEquals(Providers.filesystem.clazz, walProvider);
733    WALProvider metaWALProvider = walFactory.getMetaProvider();
734    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
735  }
736
737  @Test
738  public void testReaderClosedOnBadCodec() throws IOException {
739    // Create our own Configuration and WALFactory to avoid breaking other test methods
740    Configuration confWithCodec = new Configuration(conf);
741    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class,
742      Codec.class);
743    WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
744
745    // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
746    // the FileSystem and know if close() was called on those InputStreams.
747    List<InputStreamProxy> openedReaders = new ArrayList<>();
748    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
749      @Override
750      public FSDataInputStream open(Path p) throws IOException {
751        InputStreamProxy is = new InputStreamProxy(super.open(p));
752        openedReaders.add(is);
753        return is;
754      }
755
756      @Override
757      public FSDataInputStream open(Path p, int blockSize) throws IOException {
758        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
759        openedReaders.add(is);
760        return is;
761      }
762    };
763
764    final TableDescriptor htd =
765      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
766        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
767    final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
768
769    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
770    for (byte[] fam : htd.getColumnFamilyNames()) {
771      scopes.put(fam, 0);
772    }
773    byte[] row = Bytes.toBytes("row");
774    WAL.Reader reader = null;
775    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
776    try {
777      // Write one column in one edit.
778      WALEdit cols = new WALEdit();
779      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
780        EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
781      final WAL log = customFactory.getWAL(hri);
782      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
783        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
784      // Sync the edit to the WAL
785      log.sync(txid);
786      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
787      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
788      log.shutdown();
789
790      // Inject our failure, object is constructed via reflection.
791      BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
792
793      // Now open a reader on the log which will throw an exception when
794      // we try to instantiate the custom Codec.
795      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
796      try {
797        reader = customFactory.createReader(proxyFs, filename);
798        fail("Expected to see an exception when creating WAL reader");
799      } catch (Exception e) {
800        // Expected that we get an exception
801      }
802      // We should have exactly one reader
803      assertEquals(1, openedReaders.size());
804      // And that reader should be closed.
805      long unclosedReaders =
806        openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
807      assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
808    } finally {
809      if (reader != null) {
810        reader.close();
811      }
812    }
813  }
814
815  /**
816   * A proxy around FSDataInputStream which can report if close() was called.
817   */
818  private static class InputStreamProxy extends FSDataInputStream {
819    private final InputStream real;
820    private final AtomicBoolean isClosed = new AtomicBoolean(false);
821
822    public InputStreamProxy(InputStream real) {
823      super(real);
824      this.real = real;
825    }
826
827    @Override
828    public void close() throws IOException {
829      isClosed.set(true);
830      real.close();
831    }
832  }
833
834  /**
835   * A custom WALCellCodec in which we can inject failure.
836   */
837  @SuppressWarnings("unused")
838  private static class BrokenWALCellCodec extends WALCellCodec {
839    static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
840
841    static void maybeInjectFailure() {
842      if (THROW_FAILURE_ON_INIT.get()) {
843        throw new RuntimeException("Injected instantiation exception");
844      }
845    }
846
847    public BrokenWALCellCodec() {
848      super();
849      maybeInjectFailure();
850    }
851
852    public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
853      super(conf, compression);
854      maybeInjectFailure();
855    }
856  }
857}