001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.Closeable;
027import java.io.IOException;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.HTableDescriptor;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
041import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.apache.hadoop.hbase.wal.WALFactory;
047import org.apache.hadoop.hbase.wal.WALKeyImpl;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.rules.TestName;
055
056/**
057 * WAL tests that can be reused across providers.
058 */
059public abstract class AbstractTestProtobufLog<W extends Closeable> {
060  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061
062  protected FileSystem fs;
063  protected Path dir;
064  protected WALFactory wals;
065
066  @Rule
067  public final TestName currentTest = new TestName();
068
069  @Before
070  public void setUp() throws Exception {
071    fs = TEST_UTIL.getDFSCluster().getFileSystem();
072    dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
073    wals = new WALFactory(TEST_UTIL.getConfiguration(), currentTest.getMethodName());
074  }
075
076  @After
077  public void tearDown() throws Exception {
078    wals.close();
079    FileStatus[] entries = fs.listStatus(new Path("/"));
080    for (FileStatus dir : entries) {
081      fs.delete(dir.getPath(), true);
082    }
083  }
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    // Make block sizes small.
088    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
089    // needed for testAppendClose()
090    // quicker heartbeat interval for faster DN death notification
091    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
092    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
093    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
094
095    // faster failover with cluster.shutdown();fs.close() idiom
096    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
097    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
098    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
099    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
100      SampleRegionWALCoprocessor.class.getName());
101    TEST_UTIL.startMiniDFSCluster(3);
102  }
103
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL.shutdownMiniCluster();
107  }
108
109  /**
110   * Reads the WAL with and without WALTrailer. n
111   */
112  @Test
113  public void testWALTrailer() throws IOException {
114    // read With trailer.
115    doRead(true);
116    // read without trailer
117    doRead(false);
118  }
119
120  /**
121   * Appends entries in the WAL and reads it.
122   * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
123   *                    so that a trailer is appended to the WAL. Otherwise, it starts reading after
124   *                    the sync call. This means that reader is not aware of the trailer. In this
125   *                    scenario, if the reader tries to read the trailer in its next() call, it
126   *                    returns false from ProtoBufLogReader. n
127   */
128  private void doRead(boolean withTrailer) throws IOException {
129    final int columnCount = 5;
130    final int recordCount = 5;
131    final TableName tableName = TableName.valueOf("tablename");
132    final byte[] row = Bytes.toBytes("row");
133    long timestamp = EnvironmentEdgeManager.currentTime();
134    Path path = new Path(dir, "tempwal");
135    // delete the log if already exists, for test only
136    fs.delete(path, true);
137    W writer = null;
138    ProtobufLogReader reader = null;
139    try {
140      HRegionInfo hri =
141        new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
142      HTableDescriptor htd = new HTableDescriptor(tableName);
143      fs.mkdirs(dir);
144      // Write log in pb format.
145      writer = createWriter(path);
146      for (int i = 0; i < recordCount; ++i) {
147        WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, timestamp,
148          HConstants.DEFAULT_CLUSTER_ID);
149        WALEdit edit = new WALEdit();
150        for (int j = 0; j < columnCount; ++j) {
151          if (i == 0) {
152            htd.addFamily(new HColumnDescriptor("column" + j));
153          }
154          String value = i + "" + j;
155          edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
156        }
157        append(writer, new WAL.Entry(key, edit));
158      }
159      sync(writer);
160      if (withTrailer) writer.close();
161
162      // Now read the log using standard means.
163      reader = (ProtobufLogReader) wals.createReader(fs, path);
164      if (withTrailer) {
165        assertNotNull(reader.trailer);
166      } else {
167        assertNull(reader.trailer);
168      }
169      for (int i = 0; i < recordCount; ++i) {
170        WAL.Entry entry = reader.next();
171        assertNotNull(entry);
172        assertEquals(columnCount, entry.getEdit().size());
173        assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
174        assertEquals(tableName, entry.getKey().getTableName());
175        int idx = 0;
176        for (Cell val : entry.getEdit().getCells()) {
177          assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
178            val.getRowLength()));
179          String value = i + "" + idx;
180          assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
181          idx++;
182        }
183      }
184      WAL.Entry entry = reader.next();
185      assertNull(entry);
186    } finally {
187      if (writer != null) {
188        writer.close();
189      }
190      if (reader != null) {
191        reader.close();
192      }
193    }
194  }
195
196  protected abstract W createWriter(Path path) throws IOException;
197
198  protected abstract void append(W writer, WAL.Entry entry) throws IOException;
199
200  protected abstract void sync(W writer) throws IOException;
201}