001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
049import org.apache.hadoop.hbase.wal.WAL;
050import org.apache.hadoop.hbase.wal.WALFactory;
051import org.apache.hadoop.hdfs.MiniDFSCluster;
052import org.junit.After;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064import org.junit.runners.Parameterized.Parameters;
065
066/**
067 * Tests for WAL write durability
068 */
069@RunWith(Parameterized.class)
070@Category({ RegionServerTests.class, MediumTests.class })
071public class TestDurability {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestDurability.class);
076
077  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078  private static FileSystem FS;
079  private static MiniDFSCluster CLUSTER;
080  private static Configuration CONF;
081  private static Path DIR;
082
083  private static byte[] FAMILY = Bytes.toBytes("family");
084  private static byte[] ROW = Bytes.toBytes("row");
085  private static byte[] COL = Bytes.toBytes("col");
086
087  @Parameter
088  public String walProvider;
089
090  @Rule
091  public TestName name = new TestName();
092
093  @Parameters(name = "{index}: provider={0}")
094  public static Iterable<Object[]> data() {
095    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
096  }
097
098  @BeforeClass
099  public static void setUpBeforeClass() throws Exception {
100    CONF = TEST_UTIL.getConfiguration();
101    TEST_UTIL.startMiniDFSCluster(1);
102
103    CLUSTER = TEST_UTIL.getDFSCluster();
104    FS = CLUSTER.getFileSystem();
105    DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
106    FSUtils.setRootDir(CONF, DIR);
107  }
108
109  @AfterClass
110  public static void tearDownAfterClass() throws Exception {
111    TEST_UTIL.shutdownMiniCluster();
112  }
113
114  @Before
115  public void setUp() {
116    CONF.set(WALFactory.WAL_PROVIDER, walProvider);
117  }
118
119  @After
120  public void tearDown() throws IOException {
121    FS.delete(DIR, true);
122  }
123
124  @Test
125  public void testDurability() throws Exception {
126    WALFactory wals = new WALFactory(CONF,
127        ServerName.valueOf("TestDurability", 16010, System.currentTimeMillis()).toString());
128    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
129    WAL wal = region.getWAL();
130    HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(),
131      "deferredRegion", wal, Durability.ASYNC_WAL);
132
133    region.put(newPut(null));
134    verifyWALCount(wals, wal, 1);
135
136    // a put through the deferred table does not write to the wal immediately,
137    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
138    // AsyncFlusher thread
139    deferredRegion.put(newPut(null));
140    // but will after we sync the wal
141    wal.sync();
142    verifyWALCount(wals, wal, 2);
143
144    // a put through a deferred table will be sync with the put sync'ed put
145    deferredRegion.put(newPut(null));
146    wal.sync();
147    verifyWALCount(wals, wal, 3);
148    region.put(newPut(null));
149    verifyWALCount(wals, wal, 4);
150
151    // a put through a deferred table will be sync with the put sync'ed put
152    deferredRegion.put(newPut(Durability.USE_DEFAULT));
153    wal.sync();
154    verifyWALCount(wals, wal, 5);
155    region.put(newPut(Durability.USE_DEFAULT));
156    verifyWALCount(wals, wal, 6);
157
158    // SKIP_WAL never writes to the wal
159    region.put(newPut(Durability.SKIP_WAL));
160    deferredRegion.put(newPut(Durability.SKIP_WAL));
161    verifyWALCount(wals, wal, 6);
162    wal.sync();
163    verifyWALCount(wals, wal, 6);
164
165    // Async overrides sync table default
166    region.put(newPut(Durability.ASYNC_WAL));
167    deferredRegion.put(newPut(Durability.ASYNC_WAL));
168    wal.sync();
169    verifyWALCount(wals, wal, 8);
170
171    // sync overrides async table default
172    region.put(newPut(Durability.SYNC_WAL));
173    deferredRegion.put(newPut(Durability.SYNC_WAL));
174    verifyWALCount(wals, wal, 10);
175
176    // fsync behaves like sync
177    region.put(newPut(Durability.FSYNC_WAL));
178    deferredRegion.put(newPut(Durability.FSYNC_WAL));
179    verifyWALCount(wals, wal, 12);
180  }
181
182  @Test
183  public void testIncrement() throws Exception {
184    byte[] row1 = Bytes.toBytes("row1");
185    byte[] col1 = Bytes.toBytes("col1");
186    byte[] col2 = Bytes.toBytes("col2");
187    byte[] col3 = Bytes.toBytes("col3");
188
189    // Setting up region
190    WALFactory wals = new WALFactory(CONF,
191        ServerName.valueOf("TestIncrement", 16010, System.currentTimeMillis()).toString());
192    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
193    WAL wal = region.getWAL();
194
195    // col1: amount = 0, 1 write back to WAL
196    Increment inc1 = new Increment(row1);
197    inc1.addColumn(FAMILY, col1, 0);
198    Result res = region.increment(inc1);
199    assertEquals(1, res.size());
200    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1)));
201    verifyWALCount(wals, wal, 1);
202
203    // col1: amount = 1, 1 write back to WAL
204    inc1 = new Increment(row1);
205    inc1.addColumn(FAMILY, col1, 1);
206    res = region.increment(inc1);
207    assertEquals(1, res.size());
208    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
209    verifyWALCount(wals, wal, 2);
210
211    // col1: amount = 0, 1 write back to WAL
212    inc1 = new Increment(row1);
213    inc1.addColumn(FAMILY, col1, 0);
214    res = region.increment(inc1);
215    assertEquals(1, res.size());
216    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
217    verifyWALCount(wals, wal, 3);
218    // col1: amount = 0, col2: amount = 0, col3: amount = 0
219    // 1 write back to WAL
220    inc1 = new Increment(row1);
221    inc1.addColumn(FAMILY, col1, 0);
222    inc1.addColumn(FAMILY, col2, 0);
223    inc1.addColumn(FAMILY, col3, 0);
224    res = region.increment(inc1);
225    assertEquals(3, res.size());
226    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
227    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
228    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
229    verifyWALCount(wals, wal, 4);
230
231    // col1: amount = 5, col2: amount = 4, col3: amount = 3
232    // 1 write back to WAL
233    inc1 = new Increment(row1);
234    inc1.addColumn(FAMILY, col1, 5);
235    inc1.addColumn(FAMILY, col2, 4);
236    inc1.addColumn(FAMILY, col3, 3);
237    res = region.increment(inc1);
238    assertEquals(3, res.size());
239    assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
240    assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
241    assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
242    verifyWALCount(wals, wal, 5);
243  }
244
245  /**
246   * Test when returnResults set to false in increment it should not return the result instead it
247   * resturn null.
248   */
249  @Test
250  public void testIncrementWithReturnResultsSetToFalse() throws Exception {
251    byte[] row1 = Bytes.toBytes("row1");
252    byte[] col1 = Bytes.toBytes("col1");
253
254    // Setting up region
255    WALFactory wals = new WALFactory(CONF,
256        ServerName
257            .valueOf("testIncrementWithReturnResultsSetToFalse", 16010, System.currentTimeMillis())
258            .toString());
259    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
260
261    Increment inc1 = new Increment(row1);
262    inc1.setReturnResults(false);
263    inc1.addColumn(FAMILY, col1, 1);
264    Result res = region.increment(inc1);
265    assertTrue(res.isEmpty());
266  }
267
268  private Put newPut(Durability durability) {
269    Put p = new Put(ROW);
270    p.addColumn(FAMILY, COL, COL);
271    if (durability != null) {
272      p.setDurability(durability);
273    }
274    return p;
275  }
276
277  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
278    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
279    WAL.Reader reader = wals.createReader(FS, walPath);
280    int count = 0;
281    WAL.Entry entry = new WAL.Entry();
282    while (reader.next(entry) != null) {
283      count++;
284    }
285    reader.close();
286    assertEquals(expected, count);
287  }
288
289  // lifted from TestAtomicOperation
290  private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException {
291    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_"));
292    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
293        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
294    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
295    Path path = new Path(DIR, tableName.getNameAsString());
296    if (FS.exists(path)) {
297      if (!FS.delete(path, true)) {
298        throw new IOException("Failed delete of " + path);
299      }
300    }
301    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
302    return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info));
303  }
304
305  private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal,
306      Durability durability) throws IOException {
307    Path path = new Path(DIR, dir);
308    if (FS.exists(path)) {
309      if (!FS.delete(path, true)) {
310        throw new IOException("Failed delete of " + path);
311      }
312    }
313    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
314    return HRegion.createHRegion(info, path, CONF, td, wal);
315  }
316}