001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertNotNull;
025import static org.junit.jupiter.api.Assertions.assertNotSame;
026import static org.junit.jupiter.api.Assertions.assertThrows;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.junit.jupiter.api.Assertions.fail;
029
030import java.io.IOException;
031import java.io.InputStream;
032import java.lang.reflect.Method;
033import java.net.BindException;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.NavigableMap;
037import java.util.TreeMap;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.Coprocessor;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.RegionInfoBuilder;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.codec.Codec;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
063import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
066import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
067import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.testclassification.RegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.CommonFSUtils;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
074import org.apache.hadoop.hbase.util.Threads;
075import org.apache.hadoop.hbase.wal.WALFactory.Providers;
076import org.apache.hadoop.hdfs.DistributedFileSystem;
077import org.apache.hadoop.hdfs.MiniDFSCluster;
078import org.apache.hadoop.hdfs.protocol.HdfsConstants;
079import org.junit.jupiter.api.AfterAll;
080import org.junit.jupiter.api.AfterEach;
081import org.junit.jupiter.api.BeforeAll;
082import org.junit.jupiter.api.BeforeEach;
083import org.junit.jupiter.api.Tag;
084import org.junit.jupiter.api.Test;
085import org.junit.jupiter.api.TestInfo;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089/**
090 * WAL tests that can be reused across providers.
091 */
092@Tag(RegionServerTests.TAG)
093@Tag(MediumTests.TAG)
094public class TestWALFactory {
095
096  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
097
098  protected static Configuration conf;
099  private static MiniDFSCluster cluster;
100  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
101  protected static Path hbaseDir;
102  protected static Path hbaseWALDir;
103
104  protected FileSystem fs;
105  protected Path dir;
106  protected WALFactory wals;
107  private ServerName currentServername;
108
109  private String currentTestName;
110
111  @BeforeEach
112  public void setUp(TestInfo testInfo) throws Exception {
113    currentTestName = testInfo.getTestMethod().get().getName();
114    fs = cluster.getFileSystem();
115    dir = new Path(hbaseDir, currentTestName);
116    this.currentServername = ServerName.valueOf(currentTestName, 16010, 1);
117    wals = new WALFactory(conf, this.currentServername.toString());
118  }
119
120  @AfterEach
121  public void tearDown() throws Exception {
122    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
123    try {
124      wals.close();
125    } catch (IOException exception) {
126      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this"
127        + " may be the cause. Message: " + exception);
128      LOG.debug("Exception details for failure to close wal factory.", exception);
129    }
130    FileStatus[] entries = fs.listStatus(new Path("/"));
131    for (FileStatus dir : entries) {
132      fs.delete(dir.getPath(), true);
133    }
134  }
135
136  @BeforeAll
137  public static void setUpBeforeClass() throws Exception {
138    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
139    // Make block sizes small.
140    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
141    // needed for testAppendClose()
142    // quicker heartbeat interval for faster DN death notification
143    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
144    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
145    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
146
147    // faster failover with cluster.shutdown();fs.close() idiom
148    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
149    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
150    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
151    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
152    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
153    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
154      SampleRegionWALCoprocessor.class.getName());
155    TEST_UTIL.startMiniDFSCluster(3);
156
157    conf = TEST_UTIL.getConfiguration();
158    cluster = TEST_UTIL.getDFSCluster();
159
160    hbaseDir = TEST_UTIL.createRootDir();
161    hbaseWALDir = TEST_UTIL.createWALRootDir();
162  }
163
164  @AfterAll
165  public static void tearDownAfterClass() throws Exception {
166    TEST_UTIL.shutdownMiniCluster();
167  }
168
169  @Test
170  public void canCloseSingleton() throws IOException {
171    WALFactory.getInstance(conf).close();
172  }
173
174  /**
175   * Just write multiple logs then split. Before fix for HADOOP-2283, this would fail.
176   */
177  @Test
178  public void testSplit() throws IOException {
179    final TableName tableName = TableName.valueOf(currentTestName);
180    final byte[] rowName = tableName.getName();
181    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
182    final int howmany = 3;
183    RegionInfo[] infos = new RegionInfo[3];
184    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
185    fs.mkdirs(tableDataDir);
186    Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
187    fs.mkdirs(tabledir);
188    for (int i = 0; i < howmany; i++) {
189      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
190        .setEndKey(Bytes.toBytes("" + (i + 1))).build();
191      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
192      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
193      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
194    }
195    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
196    scopes.put(Bytes.toBytes("column"), 0);
197
198    // Add edits for three regions.
199    for (int ii = 0; ii < howmany; ii++) {
200      for (int i = 0; i < howmany; i++) {
201        final WAL log = wals.getWAL(infos[i]);
202        for (int j = 0; j < howmany; j++) {
203          WALEdit edit = new WALEdit();
204          byte[] family = Bytes.toBytes("column");
205          byte[] qualifier = Bytes.toBytes(Integer.toString(j));
206          byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
207          edit.add(
208            new KeyValue(rowName, family, qualifier, EnvironmentEdgeManager.currentTime(), column));
209          LOG.info("Region " + i + ": " + edit);
210          WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
211            EnvironmentEdgeManager.currentTime(), mvcc, scopes);
212          log.appendData(infos[i], walKey, edit);
213          walKey.getWriteEntry();
214        }
215        log.sync();
216        log.rollWriter(true);
217      }
218    }
219    wals.shutdown();
220    // The below calculation of logDir relies on insider information... WALSplitter should be
221    // connected better
222    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
223    Path logDir = new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
224      this.currentServername.toString());
225    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
226    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
227    verifySplits(splits, howmany);
228  }
229
230  /**
231   * Test new HDFS-265 sync.
232   */
233  @Test
234  public void Broken_testSync() throws Exception {
235    TableName tableName = TableName.valueOf(currentTestName);
236    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
237    // First verify that using streams all works.
238    Path p = new Path(dir, currentTestName + ".fsdos");
239    FSDataOutputStream out = fs.create(p);
240    out.write(tableName.getName());
241    Method syncMethod = null;
242    try {
243      syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
244    } catch (NoSuchMethodException e) {
245      try {
246        syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
247      } catch (NoSuchMethodException ex) {
248        fail("This version of Hadoop supports neither Syncable.sync() " + "nor Syncable.hflush().");
249      }
250    }
251    syncMethod.invoke(out, new Object[] {});
252    FSDataInputStream in = fs.open(p);
253    assertTrue(in.available() > 0);
254    byte[] buffer = new byte[1024];
255    int read = in.read(buffer);
256    assertEquals(tableName.getName().length, read);
257    out.close();
258    in.close();
259
260    final int total = 20;
261    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
262    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
263    scopes.put(tableName.getName(), 0);
264    final WAL wal = wals.getWAL(info);
265
266    for (int i = 0; i < total; i++) {
267      WALEdit kvs = new WALEdit();
268      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
269      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
270        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
271    }
272    // Now call sync and try reading. Opening a Reader before you sync just
273    // gives you EOFE.
274    wal.sync();
275    // Open a Reader.
276    Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
277    int count = NoEOFWALStreamReader.count(wals, fs, walPath);
278    assertEquals(total, count);
279    // Add test that checks to see that an open of a Reader works on a file
280    // that has had a sync done on it.
281    for (int i = 0; i < total; i++) {
282      WALEdit kvs = new WALEdit();
283      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
284      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
285        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
286    }
287    wal.sync();
288    count = NoEOFWALStreamReader.count(wals, fs, walPath);
289    assertTrue(count >= total);
290    // If I sync, should see double the edits.
291    wal.sync();
292    count = NoEOFWALStreamReader.count(wals, fs, walPath);
293    assertEquals(total * 2, count);
294    // Now do a test that ensures stuff works when we go over block boundary,
295    // especially that we return good length on file.
296    final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
297    for (int i = 0; i < total; i++) {
298      WALEdit kvs = new WALEdit();
299      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
300      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
301        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
302    }
303    // Now I should have written out lots of blocks. Sync then read.
304    wal.sync();
305    count = NoEOFWALStreamReader.count(wals, fs, walPath);
306    assertEquals(total * 3, count);
307    // shutdown and ensure that Reader gets right length also.
308    wal.shutdown();
309    count = NoEOFWALStreamReader.count(wals, fs, walPath);
310    assertEquals(total * 3, count);
311  }
312
313  private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
314    assertEquals(howmany * howmany, splits.size());
315    for (int i = 0; i < splits.size(); i++) {
316      LOG.info("Verifying=" + splits.get(i));
317      try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) {
318        int count = 0;
319        String previousRegion = null;
320        long seqno = -1;
321        WAL.Entry entry = new WAL.Entry();
322        while ((entry = reader.next(entry)) != null) {
323          WALKey key = entry.getKey();
324          String region = Bytes.toString(key.getEncodedRegionName());
325          // Assert that all edits are for same region.
326          if (previousRegion != null) {
327            assertEquals(previousRegion, region);
328          }
329          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
330          assertTrue(seqno < key.getSequenceId());
331          seqno = key.getSequenceId();
332          previousRegion = region;
333          count++;
334        }
335        assertEquals(howmany, count);
336      }
337    }
338  }
339
340  /*
341   * We pass different values to recoverFileLease() so that different code paths are covered For
342   * this test to pass, requires: 1. HDFS-200 (append support) 2. HDFS-988 (SafeMode should freeze
343   * file operations [FSNamesystem.nextGenerationStampForBlock]) 3. HDFS-142 (on restart, maintain
344   * pendingCreates)
345   */
346  @Test
347  public void testAppendClose() throws Exception {
348    TableName tableName = TableName.valueOf(currentTestName);
349    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
350
351    WAL wal = wals.getWAL(regionInfo);
352    int total = 20;
353
354    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
355    scopes.put(tableName.getName(), 0);
356    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
357    for (int i = 0; i < total; i++) {
358      WALEdit kvs = new WALEdit();
359      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
360      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
361        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
362    }
363    // Now call sync to send the data to HDFS datanodes
364    wal.sync();
365    int namenodePort = cluster.getNameNodePort();
366    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
367
368    // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
369    try {
370      DistributedFileSystem dfs = cluster.getFileSystem();
371      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
372      TEST_UTIL.shutdownMiniDFSCluster();
373      try {
374        // wal.writer.close() will throw an exception,
375        // but still call this since it closes the LogSyncer thread first
376        wal.shutdown();
377      } catch (IOException e) {
378        LOG.info(e.toString(), e);
379      }
380      fs.close(); // closing FS last so DFSOutputStream can't call close
381      LOG.info("STOPPED first instance of the cluster");
382    } finally {
383      // Restart the cluster
384      while (cluster.isClusterUp()) {
385        LOG.error("Waiting for cluster to go down");
386        Thread.sleep(1000);
387      }
388      assertFalse(cluster.isClusterUp());
389      cluster = null;
390      for (int i = 0; i < 100; i++) {
391        try {
392          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
393          break;
394        } catch (BindException e) {
395          LOG.info("Sleeping.  BindException bringing up new cluster");
396          Threads.sleep(1000);
397        }
398      }
399      cluster.waitActive();
400      fs = cluster.getFileSystem();
401      LOG.info("STARTED second instance.");
402    }
403
404    // set the lease period to be 1 second so that the
405    // namenode triggers lease recovery upon append request
406    Method setLeasePeriod =
407      cluster.getClass().getDeclaredMethod("setLeasePeriod", new Class[] { Long.TYPE, Long.TYPE });
408    setLeasePeriod.setAccessible(true);
409    setLeasePeriod.invoke(cluster, 1000L, 1000L);
410    try {
411      Thread.sleep(1000);
412    } catch (InterruptedException e) {
413      LOG.info(e.toString(), e);
414    }
415
416    // Now try recovering the log, like the HMaster would do
417    final FileSystem recoveredFs = fs;
418    final Configuration rlConf = conf;
419
420    class RecoverLogThread extends Thread {
421      public Exception exception = null;
422
423      @Override
424      public void run() {
425        try {
426          RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
427        } catch (IOException e) {
428          exception = e;
429        }
430      }
431    }
432
433    RecoverLogThread t = new RecoverLogThread();
434    t.start();
435    // Timeout after 60 sec. Without correct patches, would be an infinite loop
436    t.join(60 * 1000);
437    if (t.isAlive()) {
438      t.interrupt();
439      throw new Exception("Timed out waiting for WAL.recoverLog()");
440    }
441
442    if (t.exception != null) throw t.exception;
443
444    // Make sure you can read all the content
445    int count = 0;
446    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) {
447      WAL.Entry entry = new WAL.Entry();
448      while (reader.next(entry) != null) {
449        count++;
450        assertTrue(entry.getEdit().getCells().size() == 1, "Should be one KeyValue per WALEdit");
451      }
452    }
453    assertEquals(total, count);
454
455    // Reset the lease period
456    setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
457  }
458
459  /**
460   * Tests that we can write out an edit, close, and then read it back in again.
461   */
462  @Test
463  public void testEditAdd() throws IOException {
464    int colCount = 10;
465    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName))
466      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
467    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
468    for (byte[] fam : htd.getColumnFamilyNames()) {
469      scopes.put(fam, 0);
470    }
471    byte[] row = Bytes.toBytes("row");
472    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
473
474    // Write columns named 1, 2, 3, etc. and then values of single byte
475    // 1, 2, 3...
476    long timestamp = EnvironmentEdgeManager.currentTime();
477    WALEdit cols = new WALEdit();
478    for (int i = 0; i < colCount; i++) {
479      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
480        timestamp, new byte[] { (byte) (i + '0') }));
481    }
482    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
483      .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
484    final WAL log = wals.getWAL(info);
485
486    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
487      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
488    log.sync(txid);
489    log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
490    log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
491    log.shutdown();
492    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
493    // Now open a reader on the log and assert append worked.
494    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) {
495      // Above we added all columns on a single row so we only read one
496      // entry in the below... thats why we have '1'.
497      for (int i = 0; i < 1; i++) {
498        WAL.Entry entry = reader.next(null);
499        if (entry == null) break;
500        WALKey key = entry.getKey();
501        WALEdit val = entry.getEdit();
502        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
503        assertTrue(htd.getTableName().equals(key.getTableName()));
504        Cell cell = val.getCells().get(0);
505        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
506          cell.getRowLength()));
507        assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
508        LOG.info(key + " " + val);
509      }
510    }
511  }
512
513  @Test
514  public void testAppend() throws IOException {
515    int colCount = 10;
516    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName))
517      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
518    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
519    for (byte[] fam : htd.getColumnFamilyNames()) {
520      scopes.put(fam, 0);
521    }
522    byte[] row = Bytes.toBytes("row");
523    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
524    // Write columns named 1, 2, 3, etc. and then values of single byte
525    // 1, 2, 3...
526    long timestamp = EnvironmentEdgeManager.currentTime();
527    WALEdit cols = new WALEdit();
528    for (int i = 0; i < colCount; i++) {
529      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
530        timestamp, new byte[] { (byte) (i + '0') }));
531    }
532    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
533    final WAL log = wals.getWAL(hri);
534    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
535      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
536    log.sync(txid);
537    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
538    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
539    log.shutdown();
540    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
541    // Now open a reader on the log and assert append worked.
542    try (WALStreamReader reader = wals.createStreamReader(fs, filename)) {
543      WAL.Entry entry = reader.next();
544      assertEquals(colCount, entry.getEdit().size());
545      int idx = 0;
546      for (Cell val : entry.getEdit().getCells()) {
547        assertTrue(
548          Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()));
549        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
550        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
551          val.getRowLength()));
552        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
553        System.out.println(entry.getKey() + " " + val);
554        idx++;
555      }
556    }
557  }
558
559  /**
560   * Test that we can visit entries before they are appended
561   */
562  @Test
563  public void testVisitors() throws Exception {
564    final int COL_COUNT = 10;
565    final TableName tableName = TableName.valueOf(currentTestName);
566    final byte[] row = Bytes.toBytes("row");
567    final DumbWALActionsListener visitor = new DumbWALActionsListener();
568    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
569    long timestamp = EnvironmentEdgeManager.currentTime();
570    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
571    scopes.put(Bytes.toBytes("column"), 0);
572
573    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
574    final WAL log = wals.getWAL(hri);
575    log.registerWALActionsListener(visitor);
576    for (int i = 0; i < COL_COUNT; i++) {
577      WALEdit cols = new WALEdit();
578      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
579        timestamp, new byte[] { (byte) (i + '0') }));
580      log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
581        EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
582    }
583    log.sync();
584    assertEquals(COL_COUNT, visitor.increments);
585    log.unregisterWALActionsListener(visitor);
586    WALEdit cols = new WALEdit();
587    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)),
588      timestamp, new byte[] { (byte) (11 + '0') }));
589    log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
590      EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
591    log.sync();
592    assertEquals(COL_COUNT, visitor.increments);
593  }
594
595  /**
596   * A loaded WAL coprocessor won't break existing WAL test cases.
597   */
598  @Test
599  public void testWALCoprocessorLoaded() throws Exception {
600    // test to see whether the coprocessor is loaded or not.
601    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
602    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
603    assertNotNull(c);
604  }
605
606  static class DumbWALActionsListener implements WALActionsListener {
607    int increments = 0;
608
609    @Override
610    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
611      increments++;
612    }
613  }
614
615  @Test
616  public void testWALProviders() throws IOException {
617    Configuration conf = new Configuration();
618    WALFactory walFactory = new WALFactory(conf, this.currentServername, null);
619    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
620
621    // if providers are not set and do not enable SyncReplicationWALProvider
622    walFactory = new WALFactory(conf, this.currentServername, null);
623    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
624  }
625
626  @Test
627  public void testOnlySetWALProvider() throws IOException {
628    Configuration conf = new Configuration();
629    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
630    WALFactory walFactory = new WALFactory(conf, this.currentServername, null);
631    // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set
632    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getWALProvider().getClass());
633    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
634  }
635
636  @Test
637  public void testOnlySetMetaWALProvider() throws IOException {
638    Configuration conf = new Configuration();
639    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
640    WALFactory walFactory = new WALFactory(conf, this.currentServername, null);
641    assertEquals(WALFactory.Providers.defaultProvider.clazz,
642      walFactory.getWALProvider().getClass());
643    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
644  }
645
646  @Test
647  public void testDefaultProvider() throws IOException {
648    final Configuration conf = new Configuration();
649    // AsyncFSWal is the default, we should be able to request any WAL.
650    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername, null);
651    Class<? extends WALProvider> fshLogProvider =
652      normalWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
653    assertEquals(Providers.filesystem.clazz, fshLogProvider);
654
655    // Imagine a world where MultiWAL is the default
656    final WALFactory customizedWalFactory = new WALFactory(conf, this.currentServername, null) {
657      @Override
658      Providers getDefaultProvider() {
659        return Providers.multiwal;
660      }
661    };
662    // If we don't specify a WALProvider, we should get the default implementation.
663    Class<? extends WALProvider> multiwalProviderClass =
664      customizedWalFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.multiwal.name());
665    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
666  }
667
668  @Test
669  public void testCustomProvider() throws IOException {
670    final Configuration config = new Configuration();
671    config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
672    final WALFactory walFactory = new WALFactory(config, this.currentServername, null);
673    Class<? extends WALProvider> walProvider =
674      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
675    assertEquals(IOTestProvider.class, walProvider);
676    WALProvider metaWALProvider = walFactory.getMetaProvider();
677    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
678  }
679
680  @Test
681  public void testCustomMetaProvider() throws IOException {
682    final Configuration config = new Configuration();
683    config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
684    final WALFactory walFactory = new WALFactory(config, this.currentServername, null);
685    Class<? extends WALProvider> walProvider =
686      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
687    assertEquals(Providers.filesystem.clazz, walProvider);
688    WALProvider metaWALProvider = walFactory.getMetaProvider();
689    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
690  }
691
692  @Test
693  public void testCustomReplicationProvider() throws IOException {
694    final Configuration config = new Configuration();
695    config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName());
696    final WALFactory walFactory = new WALFactory(config, this.currentServername, null);
697    Class<? extends WALProvider> walProvider =
698      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
699    assertEquals(Providers.filesystem.clazz, walProvider);
700    WALProvider replicationWALProvider = walFactory.getReplicationProvider();
701    assertEquals(IOTestProvider.class, replicationWALProvider.getClass());
702  }
703
704  /**
705   * Confirm that we will use different WALs for hbase:meta and hbase:replication
706   */
707  @Test
708  public void testDifferentWALs() throws IOException {
709    WAL normalWAL = wals.getWAL(null);
710    WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO);
711    WAL replicationWAL = wals.getWAL(RegionInfoBuilder
712      .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build());
713    assertNotSame(normalWAL, metaWAL);
714    assertNotSame(normalWAL, replicationWAL);
715    assertNotSame(metaWAL, replicationWAL);
716  }
717
718  @Test
719  public void testReaderClosedOnBadCodec() throws IOException {
720    // Create our own Configuration and WALFactory to avoid breaking other test methods
721    Configuration confWithCodec = new Configuration(conf);
722    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class,
723      Codec.class);
724    WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
725
726    // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
727    // the FileSystem and know if close() was called on those InputStreams.
728    List<InputStreamProxy> openedReaders = new ArrayList<>();
729    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
730      @Override
731      public FSDataInputStream open(Path p) throws IOException {
732        InputStreamProxy is = new InputStreamProxy(super.open(p));
733        openedReaders.add(is);
734        return is;
735      }
736
737      @Override
738      public FSDataInputStream open(Path p, int blockSize) throws IOException {
739        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
740        openedReaders.add(is);
741        return is;
742      }
743    };
744
745    final TableDescriptor htd =
746      TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName))
747        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
748    final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
749
750    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
751    for (byte[] fam : htd.getColumnFamilyNames()) {
752      scopes.put(fam, 0);
753    }
754    byte[] row = Bytes.toBytes("row");
755    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
756    // Write one column in one edit.
757    WALEdit cols = new WALEdit();
758    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
759      EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
760    final WAL log = customFactory.getWAL(hri);
761    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
762      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
763    // Sync the edit to the WAL
764    log.sync(txid);
765    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
766    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
767    log.shutdown();
768
769    // Inject our failure, object is constructed via reflection.
770    BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
771
772    // Now open a reader on the log which will throw an exception when
773    // we try to instantiate the custom Codec.
774    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
775    assertThrows(IOException.class, () -> customFactory.createStreamReader(proxyFs, filename),
776      "Expected to see an exception when creating WAL reader");
777    // We should have exactly one reader
778    assertEquals(1, openedReaders.size());
779    // And that reader should be closed.
780    long unclosedReaders =
781      openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
782    assertEquals(0, unclosedReaders, "Should not find any open readers");
783  }
784
785  /**
786   * A proxy around FSDataInputStream which can report if close() was called.
787   */
788  private static class InputStreamProxy extends FSDataInputStream {
789    private final InputStream real;
790    private final AtomicBoolean isClosed = new AtomicBoolean(false);
791
792    public InputStreamProxy(InputStream real) {
793      super(real);
794      this.real = real;
795    }
796
797    @Override
798    public void close() throws IOException {
799      isClosed.set(true);
800      real.close();
801    }
802  }
803
804  /**
805   * A custom WALCellCodec in which we can inject failure.
806   */
807  @SuppressWarnings("unused")
808  private static class BrokenWALCellCodec extends WALCellCodec {
809    static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
810
811    static void maybeInjectFailure() {
812      if (THROW_FAILURE_ON_INIT.get()) {
813        throw new RuntimeException("Injected instantiation exception");
814      }
815    }
816
817    public BrokenWALCellCodec() {
818      super();
819      maybeInjectFailure();
820    }
821
822    public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
823      super(conf, compression);
824      maybeInjectFailure();
825    }
826  }
827}