001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALFactory.META_WAL_PROVIDER;
021import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.io.InputStream;
030import java.lang.reflect.Method;
031import java.net.BindException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.NavigableMap;
035import java.util.TreeMap;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.stream.Collectors;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.Coprocessor;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.RegionInfoBuilder;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.codec.Codec;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
063import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
064import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
065import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
066import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.RegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
073import org.apache.hadoop.hbase.util.Threads;
074import org.apache.hadoop.hbase.wal.WALFactory.Providers;
075import org.apache.hadoop.hdfs.DistributedFileSystem;
076import org.apache.hadoop.hdfs.MiniDFSCluster;
077import org.apache.hadoop.hdfs.protocol.HdfsConstants;
078import org.junit.After;
079import org.junit.AfterClass;
080import org.junit.Before;
081import org.junit.BeforeClass;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090/**
091 * WAL tests that can be reused across providers.
092 */
093@Category({RegionServerTests.class, MediumTests.class})
094public class TestWALFactory {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098      HBaseClassTestRule.forClass(TestWALFactory.class);
099
100  private static final Logger LOG = LoggerFactory.getLogger(TestWALFactory.class);
101
102  protected static Configuration conf;
103  private static MiniDFSCluster cluster;
104  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
105  protected static Path hbaseDir;
106  protected static Path hbaseWALDir;
107
108  protected FileSystem fs;
109  protected Path dir;
110  protected WALFactory wals;
111  private ServerName currentServername;
112
113  @Rule
114  public final TestName currentTest = new TestName();
115
116  @Before
117  public void setUp() throws Exception {
118    fs = cluster.getFileSystem();
119    dir = new Path(hbaseDir, currentTest.getMethodName());
120    this.currentServername = ServerName.valueOf(currentTest.getMethodName(), 16010, 1);
121    wals = new WALFactory(conf, this.currentServername.toString());
122  }
123
124  @After
125  public void tearDown() throws Exception {
126    // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
127    try {
128      wals.close();
129    } catch (IOException exception) {
130      LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
131          " may be the cause. Message: " + exception);
132      LOG.debug("Exception details for failure to close wal factory.", exception);
133    }
134    FileStatus[] entries = fs.listStatus(new Path("/"));
135    for (FileStatus dir : entries) {
136      fs.delete(dir.getPath(), true);
137    }
138  }
139
140  @BeforeClass
141  public static void setUpBeforeClass() throws Exception {
142    CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
143    // Make block sizes small.
144    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
145    // needed for testAppendClose()
146    // quicker heartbeat interval for faster DN death notification
147    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
148    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
149    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
150
151    // faster failover with cluster.shutdown();fs.close() idiom
152    TEST_UTIL.getConfiguration()
153        .setInt("hbase.ipc.client.connect.max.retries", 1);
154    TEST_UTIL.getConfiguration().setInt(
155        "dfs.client.block.recovery.retries", 1);
156    TEST_UTIL.getConfiguration().setInt(
157      "hbase.ipc.client.connection.maxidletime", 500);
158    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
159    TEST_UTIL.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
160    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
161        SampleRegionWALCoprocessor.class.getName());
162    TEST_UTIL.startMiniDFSCluster(3);
163
164    conf = TEST_UTIL.getConfiguration();
165    cluster = TEST_UTIL.getDFSCluster();
166
167    hbaseDir = TEST_UTIL.createRootDir();
168    hbaseWALDir = TEST_UTIL.createWALRootDir();
169  }
170
171  @AfterClass
172  public static void tearDownAfterClass() throws Exception {
173    TEST_UTIL.shutdownMiniCluster();
174  }
175
176  @Test
177  public void canCloseSingleton() throws IOException {
178    WALFactory.getInstance(conf).close();
179  }
180
181  /**
182   * Just write multiple logs then split.  Before fix for HADOOP-2283, this
183   * would fail.
184   * @throws IOException
185   */
186  @Test
187  public void testSplit() throws IOException {
188    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
189    final byte [] rowName = tableName.getName();
190    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
191    final int howmany = 3;
192    RegionInfo[] infos = new RegionInfo[3];
193    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
194    fs.mkdirs(tableDataDir);
195    Path tabledir = CommonFSUtils.getWALTableDir(conf, tableName);
196    fs.mkdirs(tabledir);
197    for (int i = 0; i < howmany; i++) {
198      infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
199          .setEndKey(Bytes.toBytes("" + (i + 1))).build();
200      fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
201      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
202      LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
203    }
204    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205    scopes.put(Bytes.toBytes("column"), 0);
206
207
208    // Add edits for three regions.
209    for (int ii = 0; ii < howmany; ii++) {
210      for (int i = 0; i < howmany; i++) {
211        final WAL log =
212            wals.getWAL(infos[i]);
213        for (int j = 0; j < howmany; j++) {
214          WALEdit edit = new WALEdit();
215          byte [] family = Bytes.toBytes("column");
216          byte [] qualifier = Bytes.toBytes(Integer.toString(j));
217          byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
218          edit.add(new KeyValue(rowName, family, qualifier,
219            EnvironmentEdgeManager.currentTime(), column));
220          LOG.info("Region " + i + ": " + edit);
221          WALKeyImpl walKey =  new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
222            EnvironmentEdgeManager.currentTime(), mvcc, scopes);
223          log.appendData(infos[i], walKey, edit);
224          walKey.getWriteEntry();
225        }
226        log.sync();
227        log.rollWriter(true);
228      }
229    }
230    wals.shutdown();
231    // The below calculation of logDir relies on insider information... WALSplitter should be connected better
232    // with the WAL system.... not requiring explicit path. The oldLogDir is just made up not used.
233    Path logDir =
234        new Path(new Path(hbaseWALDir, HConstants.HREGION_LOGDIR_NAME),
235            this.currentServername.toString());
236    Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
237    List<Path> splits = WALSplitter.split(hbaseWALDir, logDir, oldLogDir, fs, conf, wals);
238    verifySplits(splits, howmany);
239  }
240
241  /**
242   * Test new HDFS-265 sync.
243   * @throws Exception
244   */
245  @Test
246  public void Broken_testSync() throws Exception {
247    TableName tableName = TableName.valueOf(currentTest.getMethodName());
248    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
249    // First verify that using streams all works.
250    Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
251    FSDataOutputStream out = fs.create(p);
252    out.write(tableName.getName());
253    Method syncMethod = null;
254    try {
255      syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
256    } catch (NoSuchMethodException e) {
257      try {
258        syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
259      } catch (NoSuchMethodException ex) {
260        fail("This version of Hadoop supports neither Syncable.sync() " +
261            "nor Syncable.hflush().");
262      }
263    }
264    syncMethod.invoke(out, new Object[]{});
265    FSDataInputStream in = fs.open(p);
266    assertTrue(in.available() > 0);
267    byte [] buffer = new byte [1024];
268    int read = in.read(buffer);
269    assertEquals(tableName.getName().length, read);
270    out.close();
271    in.close();
272
273    final int total = 20;
274    WAL.Reader reader = null;
275
276    try {
277      RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
278      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
279      scopes.put(tableName.getName(), 0);
280      final WAL wal = wals.getWAL(info);
281
282      for (int i = 0; i < total; i++) {
283        WALEdit kvs = new WALEdit();
284        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
285        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
286          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
287      }
288      // Now call sync and try reading.  Opening a Reader before you sync just
289      // gives you EOFE.
290      wal.sync();
291      // Open a Reader.
292      Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
293      reader = wals.createReader(fs, walPath);
294      int count = 0;
295      WAL.Entry entry = new WAL.Entry();
296      while ((entry = reader.next(entry)) != null) count++;
297      assertEquals(total, count);
298      reader.close();
299      // Add test that checks to see that an open of a Reader works on a file
300      // that has had a sync done on it.
301      for (int i = 0; i < total; i++) {
302        WALEdit kvs = new WALEdit();
303        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
304        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
305          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
306      }
307      wal.sync();
308      reader = wals.createReader(fs, walPath);
309      count = 0;
310      while((entry = reader.next(entry)) != null) count++;
311      assertTrue(count >= total);
312      reader.close();
313      // If I sync, should see double the edits.
314      wal.sync();
315      reader = wals.createReader(fs, walPath);
316      count = 0;
317      while((entry = reader.next(entry)) != null) count++;
318      assertEquals(total * 2, count);
319      reader.close();
320      // Now do a test that ensures stuff works when we go over block boundary,
321      // especially that we return good length on file.
322      final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
323      for (int i = 0; i < total; i++) {
324        WALEdit kvs = new WALEdit();
325        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
326        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
327          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
328      }
329      // Now I should have written out lots of blocks.  Sync then read.
330      wal.sync();
331      reader = wals.createReader(fs, walPath);
332      count = 0;
333      while((entry = reader.next(entry)) != null) count++;
334      assertEquals(total * 3, count);
335      reader.close();
336      // shutdown and ensure that Reader gets right length also.
337      wal.shutdown();
338      reader = wals.createReader(fs, walPath);
339      count = 0;
340      while((entry = reader.next(entry)) != null) count++;
341      assertEquals(total * 3, count);
342      reader.close();
343    } finally {
344      if (reader != null) reader.close();
345    }
346  }
347
348  private void verifySplits(final List<Path> splits, final int howmany)
349  throws IOException {
350    assertEquals(howmany * howmany, splits.size());
351    for (int i = 0; i < splits.size(); i++) {
352      LOG.info("Verifying=" + splits.get(i));
353      WAL.Reader reader = wals.createReader(fs, splits.get(i));
354      try {
355        int count = 0;
356        String previousRegion = null;
357        long seqno = -1;
358        WAL.Entry entry = new WAL.Entry();
359        while((entry = reader.next(entry)) != null) {
360          WALKey key = entry.getKey();
361          String region = Bytes.toString(key.getEncodedRegionName());
362          // Assert that all edits are for same region.
363          if (previousRegion != null) {
364            assertEquals(previousRegion, region);
365          }
366          LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getSequenceId());
367          assertTrue(seqno < key.getSequenceId());
368          seqno = key.getSequenceId();
369          previousRegion = region;
370          count++;
371        }
372        assertEquals(howmany, count);
373      } finally {
374        reader.close();
375      }
376    }
377  }
378
379  /*
380   * We pass different values to recoverFileLease() so that different code paths are covered
381   *
382   * For this test to pass, requires:
383   * 1. HDFS-200 (append support)
384   * 2. HDFS-988 (SafeMode should freeze file operations
385   *              [FSNamesystem.nextGenerationStampForBlock])
386   * 3. HDFS-142 (on restart, maintain pendingCreates)
387   */
388  @Test
389  public void testAppendClose() throws Exception {
390    TableName tableName =
391        TableName.valueOf(currentTest.getMethodName());
392    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
393
394    WAL wal = wals.getWAL(regionInfo);
395    int total = 20;
396
397    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
398    scopes.put(tableName.getName(), 0);
399    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
400    for (int i = 0; i < total; i++) {
401      WALEdit kvs = new WALEdit();
402      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
403      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
404        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
405    }
406    // Now call sync to send the data to HDFS datanodes
407    wal.sync();
408     int namenodePort = cluster.getNameNodePort();
409    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
410
411
412    // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
413    try {
414      DistributedFileSystem dfs = cluster.getFileSystem();
415      dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
416      TEST_UTIL.shutdownMiniDFSCluster();
417      try {
418        // wal.writer.close() will throw an exception,
419        // but still call this since it closes the LogSyncer thread first
420        wal.shutdown();
421      } catch (IOException e) {
422        LOG.info(e.toString(), e);
423      }
424      fs.close(); // closing FS last so DFSOutputStream can't call close
425      LOG.info("STOPPED first instance of the cluster");
426    } finally {
427      // Restart the cluster
428      while (cluster.isClusterUp()){
429        LOG.error("Waiting for cluster to go down");
430        Thread.sleep(1000);
431      }
432      assertFalse(cluster.isClusterUp());
433      cluster = null;
434      for (int i = 0; i < 100; i++) {
435        try {
436          cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
437          break;
438        } catch (BindException e) {
439          LOG.info("Sleeping.  BindException bringing up new cluster");
440          Threads.sleep(1000);
441        }
442      }
443      cluster.waitActive();
444      fs = cluster.getFileSystem();
445      LOG.info("STARTED second instance.");
446    }
447
448    // set the lease period to be 1 second so that the
449    // namenode triggers lease recovery upon append request
450    Method setLeasePeriod = cluster.getClass()
451      .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
452    setLeasePeriod.setAccessible(true);
453    setLeasePeriod.invoke(cluster, 1000L, 1000L);
454    try {
455      Thread.sleep(1000);
456    } catch (InterruptedException e) {
457      LOG.info(e.toString(), e);
458    }
459
460    // Now try recovering the log, like the HMaster would do
461    final FileSystem recoveredFs = fs;
462    final Configuration rlConf = conf;
463
464    class RecoverLogThread extends Thread {
465      public Exception exception = null;
466
467      @Override
468      public void run() {
469        try {
470          RecoverLeaseFSUtils.recoverFileLease(recoveredFs, walPath, rlConf, null);
471        } catch (IOException e) {
472          exception = e;
473        }
474      }
475    }
476
477    RecoverLogThread t = new RecoverLogThread();
478    t.start();
479    // Timeout after 60 sec. Without correct patches, would be an infinite loop
480    t.join(60 * 1000);
481    if(t.isAlive()) {
482      t.interrupt();
483      throw new Exception("Timed out waiting for WAL.recoverLog()");
484    }
485
486    if (t.exception != null)
487      throw t.exception;
488
489    // Make sure you can read all the content
490    WAL.Reader reader = wals.createReader(fs, walPath);
491    int count = 0;
492    WAL.Entry entry = new WAL.Entry();
493    while (reader.next(entry) != null) {
494      count++;
495      assertTrue("Should be one KeyValue per WALEdit",
496                  entry.getEdit().getCells().size() == 1);
497    }
498    assertEquals(total, count);
499    reader.close();
500
501    // Reset the lease period
502    setLeasePeriod.invoke(cluster, new Object[]{ 60000L, 3600000L });
503  }
504
505  /**
506   * Tests that we can write out an edit, close, and then read it back in again.
507   */
508  @Test
509  public void testEditAdd() throws IOException {
510    int colCount = 10;
511    TableDescriptor htd =
512        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
513            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
514    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
515    for (byte[] fam : htd.getColumnFamilyNames()) {
516      scopes.put(fam, 0);
517    }
518    byte[] row = Bytes.toBytes("row");
519    WAL.Reader reader = null;
520    try {
521      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
522
523      // Write columns named 1, 2, 3, etc. and then values of single byte
524      // 1, 2, 3...
525      long timestamp = EnvironmentEdgeManager.currentTime();
526      WALEdit cols = new WALEdit();
527      for (int i = 0; i < colCount; i++) {
528        cols.add(new KeyValue(row, Bytes.toBytes("column"),
529            Bytes.toBytes(Integer.toString(i)),
530          timestamp, new byte[] { (byte)(i + '0') }));
531      }
532      RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
533          .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
534      final WAL log = wals.getWAL(info);
535
536      final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
537        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
538      log.sync(txid);
539      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
540      log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
541      log.shutdown();
542      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
543      // Now open a reader on the log and assert append worked.
544      reader = wals.createReader(fs, filename);
545      // Above we added all columns on a single row so we only read one
546      // entry in the below... thats why we have '1'.
547      for (int i = 0; i < 1; i++) {
548        WAL.Entry entry = reader.next(null);
549        if (entry == null) break;
550        WALKey key = entry.getKey();
551        WALEdit val = entry.getEdit();
552        assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
553        assertTrue(htd.getTableName().equals(key.getTableName()));
554        Cell cell = val.getCells().get(0);
555        assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
556          cell.getRowLength()));
557        assertEquals((byte)(i + '0'), CellUtil.cloneValue(cell)[0]);
558        System.out.println(key + " " + val);
559      }
560    } finally {
561      if (reader != null) {
562        reader.close();
563      }
564    }
565  }
566
567  @Test
568  public void testAppend() throws IOException {
569    int colCount = 10;
570    TableDescriptor htd =
571        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
572            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
573    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
574    for (byte[] fam : htd.getColumnFamilyNames()) {
575      scopes.put(fam, 0);
576    }
577    byte[] row = Bytes.toBytes("row");
578    WAL.Reader reader = null;
579    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
580    try {
581      // Write columns named 1, 2, 3, etc. and then values of single byte
582      // 1, 2, 3...
583      long timestamp = EnvironmentEdgeManager.currentTime();
584      WALEdit cols = new WALEdit();
585      for (int i = 0; i < colCount; i++) {
586        cols.add(new KeyValue(row, Bytes.toBytes("column"),
587          Bytes.toBytes(Integer.toString(i)),
588          timestamp, new byte[] { (byte)(i + '0') }));
589      }
590      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
591      final WAL log = wals.getWAL(hri);
592      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
593        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
594      log.sync(txid);
595      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
596      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
597      log.shutdown();
598      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
599      // Now open a reader on the log and assert append worked.
600      reader = wals.createReader(fs, filename);
601      WAL.Entry entry = reader.next();
602      assertEquals(colCount, entry.getEdit().size());
603      int idx = 0;
604      for (Cell val : entry.getEdit().getCells()) {
605        assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
606          entry.getKey().getEncodedRegionName()));
607        assertTrue(htd.getTableName().equals(entry.getKey().getTableName()));
608        assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
609          val.getRowLength()));
610        assertEquals((byte) (idx + '0'), CellUtil.cloneValue(val)[0]);
611        System.out.println(entry.getKey() + " " + val);
612        idx++;
613      }
614    } finally {
615      if (reader != null) {
616        reader.close();
617      }
618    }
619  }
620
621  /**
622   * Test that we can visit entries before they are appended
623   * @throws Exception
624   */
625  @Test
626  public void testVisitors() throws Exception {
627    final int COL_COUNT = 10;
628    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
629    final byte [] row = Bytes.toBytes("row");
630    final DumbWALActionsListener visitor = new DumbWALActionsListener();
631    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
632    long timestamp = EnvironmentEdgeManager.currentTime();
633    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
634    scopes.put(Bytes.toBytes("column"), 0);
635
636    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
637    final WAL log = wals.getWAL(hri);
638    log.registerWALActionsListener(visitor);
639    for (int i = 0; i < COL_COUNT; i++) {
640      WALEdit cols = new WALEdit();
641      cols.add(new KeyValue(row, Bytes.toBytes("column"),
642          Bytes.toBytes(Integer.toString(i)),
643          timestamp, new byte[]{(byte) (i + '0')}));
644      log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
645        EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
646    }
647    log.sync();
648    assertEquals(COL_COUNT, visitor.increments);
649    log.unregisterWALActionsListener(visitor);
650    WALEdit cols = new WALEdit();
651    cols.add(new KeyValue(row, Bytes.toBytes("column"),
652        Bytes.toBytes(Integer.toString(11)),
653        timestamp, new byte[]{(byte) (11 + '0')}));
654    log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
655      EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
656    log.sync();
657    assertEquals(COL_COUNT, visitor.increments);
658  }
659
660  /**
661   * A loaded WAL coprocessor won't break existing WAL test cases.
662   */
663  @Test
664  public void testWALCoprocessorLoaded() throws Exception {
665    // test to see whether the coprocessor is loaded or not.
666    WALCoprocessorHost host = wals.getWAL(null).getCoprocessorHost();
667    Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
668    assertNotNull(c);
669  }
670
671  static class DumbWALActionsListener implements WALActionsListener {
672    int increments = 0;
673
674    @Override
675    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
676      increments++;
677    }
678  }
679
680  @Test
681  public void testWALProviders() throws IOException {
682    Configuration conf = new Configuration();
683    // if providers are not set but enable SyncReplicationWALProvider by default for master node
684    // with not only system tables
685    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
686    assertEquals(SyncReplicationWALProvider.class, walFactory.getWALProvider().getClass());
687    WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider())
688        .getWrappedProvider();
689    assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
690
691    // if providers are not set and do not enable SyncReplicationWALProvider
692    walFactory = new WALFactory(conf, this.currentServername.toString(), null, false);
693    assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
694  }
695
696  @Test
697  public void testOnlySetWALProvider() throws IOException {
698    Configuration conf = new Configuration();
699    conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
700    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
701    WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider())
702        .getWrappedProvider();
703
704    assertEquals(SyncReplicationWALProvider.class, walFactory.getWALProvider().getClass());
705    // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set
706    assertEquals(WALFactory.Providers.multiwal.clazz, wrappedWALProvider.getClass());
707    assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass());
708  }
709
710  @Test
711  public void testOnlySetMetaWALProvider() throws IOException {
712    Configuration conf = new Configuration();
713    conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
714    WALFactory walFactory = new WALFactory(conf, this.currentServername.toString());
715    WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider())
716        .getWrappedProvider();
717
718    assertEquals(SyncReplicationWALProvider.class, walFactory.getWALProvider().getClass());
719    assertEquals(WALFactory.Providers.defaultProvider.clazz, wrappedWALProvider.getClass());
720    assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass());
721  }
722
723  @Test
724  public void testDefaultProvider() throws IOException {
725    final Configuration conf = new Configuration();
726    // AsyncFSWal is the default, we should be able to request any WAL.
727    final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString());
728    Class<? extends WALProvider> fshLogProvider = normalWalFactory.getProviderClass(
729        WALFactory.WAL_PROVIDER, Providers.filesystem.name());
730    assertEquals(Providers.filesystem.clazz, fshLogProvider);
731
732    // Imagine a world where MultiWAL is the default
733    final WALFactory customizedWalFactory = new WALFactory(
734        conf, this.currentServername.toString())  {
735      @Override
736      Providers getDefaultProvider() {
737        return Providers.multiwal;
738      }
739    };
740    // If we don't specify a WALProvider, we should get the default implementation.
741    Class<? extends WALProvider> multiwalProviderClass = customizedWalFactory.getProviderClass(
742        WALFactory.WAL_PROVIDER, Providers.multiwal.name());
743    assertEquals(Providers.multiwal.clazz, multiwalProviderClass);
744  }
745
746  @Test
747  public void testCustomProvider() throws IOException {
748    final Configuration config = new Configuration();
749    config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
750    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
751    Class<? extends WALProvider> walProvider = walFactory.getProviderClass(
752        WALFactory.WAL_PROVIDER, Providers.filesystem.name());
753    assertEquals(IOTestProvider.class, walProvider);
754    WALProvider metaWALProvider = walFactory.getMetaProvider();
755    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
756  }
757
758  @Test
759  public void testCustomMetaProvider() throws IOException {
760    final Configuration config = new Configuration();
761    config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
762    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
763    Class<? extends WALProvider> walProvider = walFactory.getProviderClass(
764        WALFactory.WAL_PROVIDER, Providers.filesystem.name());
765    assertEquals(Providers.filesystem.clazz, walProvider);
766    WALProvider metaWALProvider = walFactory.getMetaProvider();
767    assertEquals(IOTestProvider.class, metaWALProvider.getClass());
768  }
769
770  @Test
771  public void testReaderClosedOnBadCodec() throws IOException {
772    // Create our own Configuration and WALFactory to avoid breaking other test methods
773    Configuration confWithCodec = new Configuration(conf);
774    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class);
775    WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
776
777    // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
778    // the FileSystem and know if close() was called on those InputStreams.
779    List<InputStreamProxy> openedReaders = new ArrayList<>();
780    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
781      @Override
782      public FSDataInputStream open(Path p) throws IOException {
783        InputStreamProxy is = new InputStreamProxy(super.open(p));
784        openedReaders.add(is);
785        return is;
786      }
787
788      @Override
789      public FSDataInputStream open(Path p, int blockSize) throws IOException {
790        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
791        openedReaders.add(is);
792        return is;
793      }
794    };
795
796    final TableDescriptor htd =
797        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
798            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
799    final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
800
801    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
802    for (byte[] fam : htd.getColumnFamilyNames()) {
803      scopes.put(fam, 0);
804    }
805    byte[] row = Bytes.toBytes("row");
806    WAL.Reader reader = null;
807    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
808    try {
809      // Write one column in one edit.
810      WALEdit cols = new WALEdit();
811      cols.add(new KeyValue(row, Bytes.toBytes("column"),
812        Bytes.toBytes("0"), EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
813      final WAL log = customFactory.getWAL(hri);
814      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
815        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
816      // Sync the edit to the WAL
817      log.sync(txid);
818      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
819      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
820      log.shutdown();
821
822      // Inject our failure, object is constructed via reflection.
823      BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
824
825      // Now open a reader on the log which will throw an exception when
826      // we try to instantiate the custom Codec.
827      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
828      try {
829        reader = customFactory.createReader(proxyFs, filename);
830        fail("Expected to see an exception when creating WAL reader");
831      } catch (Exception e) {
832        // Expected that we get an exception
833      }
834      // We should have exactly one reader
835      assertEquals(1, openedReaders.size());
836      // And that reader should be closed.
837      long unclosedReaders = openedReaders.stream()
838          .filter((r) -> !r.isClosed.get())
839          .collect(Collectors.counting());
840      assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
841    } finally {
842      if (reader != null) {
843        reader.close();
844      }
845    }
846  }
847
848  /**
849   * A proxy around FSDataInputStream which can report if close() was called.
850   */
851  private static class InputStreamProxy extends FSDataInputStream {
852    private final InputStream real;
853    private final AtomicBoolean isClosed = new AtomicBoolean(false);
854
855    public InputStreamProxy(InputStream real) {
856      super(real);
857      this.real = real;
858    }
859
860    @Override
861    public void close() throws IOException {
862      isClosed.set(true);
863      real.close();
864    }
865  }
866
867  /**
868   * A custom WALCellCodec in which we can inject failure.
869   */
870  @SuppressWarnings("unused")
871  private static class BrokenWALCellCodec extends WALCellCodec {
872    static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
873
874    static void maybeInjectFailure() {
875      if (THROW_FAILURE_ON_INIT.get()) {
876        throw new RuntimeException("Injected instantiation exception");
877      }
878    }
879
880    public BrokenWALCellCodec() {
881      super();
882      maybeInjectFailure();
883    }
884
885    public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
886      super(conf, compression);
887      maybeInjectFailure();
888    }
889  }
890}