001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.Closeable;
028import java.io.IOException;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HColumnDescriptor;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionInfo;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
042import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.wal.WAL;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.apache.hadoop.hbase.wal.WALFactory;
047import org.apache.hadoop.hbase.wal.WALKeyImpl;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Before;
051import org.junit.BeforeClass;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.rules.TestName;
055
056/**
057 * WAL tests that can be reused across providers.
058 */
059public abstract class AbstractTestProtobufLog<W extends Closeable> {
060  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061
062  protected FileSystem fs;
063  protected Path dir;
064  protected WALFactory wals;
065
066  @Rule
067  public final TestName currentTest = new TestName();
068
069  @Before
070  public void setUp() throws Exception {
071    fs = TEST_UTIL.getDFSCluster().getFileSystem();
072    dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
073    wals = new WALFactory(TEST_UTIL.getConfiguration(), currentTest.getMethodName());
074  }
075
076  @After
077  public void tearDown() throws Exception {
078    wals.close();
079    FileStatus[] entries = fs.listStatus(new Path("/"));
080    for (FileStatus dir : entries) {
081      fs.delete(dir.getPath(), true);
082    }
083  }
084
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    // Make block sizes small.
088    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
089    // needed for testAppendClose()
090    // quicker heartbeat interval for faster DN death notification
091    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
092    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
093    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
094
095    // faster failover with cluster.shutdown();fs.close() idiom
096    TEST_UTIL.getConfiguration()
097        .setInt("hbase.ipc.client.connect.max.retries", 1);
098    TEST_UTIL.getConfiguration().setInt(
099        "dfs.client.block.recovery.retries", 1);
100    TEST_UTIL.getConfiguration().setInt(
101      "hbase.ipc.client.connection.maxidletime", 500);
102    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
103        SampleRegionWALCoprocessor.class.getName());
104    TEST_UTIL.startMiniDFSCluster(3);
105  }
106
107  @AfterClass
108  public static void tearDownAfterClass() throws Exception {
109    TEST_UTIL.shutdownMiniCluster();
110  }
111
112  /**
113   * Reads the WAL with and without WALTrailer.
114   * @throws IOException
115   */
116  @Test
117  public void testWALTrailer() throws IOException {
118    // read With trailer.
119    doRead(true);
120    // read without trailer
121    doRead(false);
122  }
123
124  /**
125   * Appends entries in the WAL and reads it.
126   * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
127   *          so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
128   *          call. This means that reader is not aware of the trailer. In this scenario, if the
129   *          reader tries to read the trailer in its next() call, it returns false from
130   *          ProtoBufLogReader.
131   * @throws IOException
132   */
133  private void doRead(boolean withTrailer) throws IOException {
134    final int columnCount = 5;
135    final int recordCount = 5;
136    final TableName tableName =
137        TableName.valueOf("tablename");
138    final byte[] row = Bytes.toBytes("row");
139    long timestamp = System.currentTimeMillis();
140    Path path = new Path(dir, "tempwal");
141    // delete the log if already exists, for test only
142    fs.delete(path, true);
143    W writer = null;
144    ProtobufLogReader reader = null;
145    try {
146      HRegionInfo hri = new HRegionInfo(tableName,
147          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
148      HTableDescriptor htd = new HTableDescriptor(tableName);
149      fs.mkdirs(dir);
150      // Write log in pb format.
151      writer = createWriter(path);
152      for (int i = 0; i < recordCount; ++i) {
153        WALKeyImpl key = new WALKeyImpl(
154            hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
155        WALEdit edit = new WALEdit();
156        for (int j = 0; j < columnCount; ++j) {
157          if (i == 0) {
158            htd.addFamily(new HColumnDescriptor("column" + j));
159          }
160          String value = i + "" + j;
161          edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
162        }
163        append(writer, new WAL.Entry(key, edit));
164      }
165      sync(writer);
166      if (withTrailer) writer.close();
167
168      // Now read the log using standard means.
169      reader = (ProtobufLogReader) wals.createReader(fs, path);
170      if (withTrailer) {
171        assertNotNull(reader.trailer);
172      } else {
173        assertNull(reader.trailer);
174      }
175      for (int i = 0; i < recordCount; ++i) {
176        WAL.Entry entry = reader.next();
177        assertNotNull(entry);
178        assertEquals(columnCount, entry.getEdit().size());
179        assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
180        assertEquals(tableName, entry.getKey().getTableName());
181        int idx = 0;
182        for (Cell val : entry.getEdit().getCells()) {
183          assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
184            val.getRowLength()));
185          String value = i + "" + idx;
186          assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
187          idx++;
188        }
189      }
190      WAL.Entry entry = reader.next();
191      assertNull(entry);
192    } finally {
193      if (writer != null) {
194        writer.close();
195      }
196      if (reader != null) {
197        reader.close();
198      }
199    }
200  }
201
202  protected abstract W createWriter(Path path) throws IOException;
203
204  protected abstract void append(W writer, WAL.Entry entry) throws IOException;
205
206  protected abstract void sync(W writer) throws IOException;
207}