001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.hbase.wal.WAL;
051import org.apache.hadoop.hbase.wal.WALFactory;
052import org.apache.hadoop.hdfs.MiniDFSCluster;
053import org.junit.After;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameter;
065import org.junit.runners.Parameterized.Parameters;
066
067/**
068 * Tests for WAL write durability
069 */
070@RunWith(Parameterized.class)
071@Category({ RegionServerTests.class, MediumTests.class })
072public class TestDurability {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestDurability.class);
077
078  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079  private static FileSystem FS;
080  private static MiniDFSCluster CLUSTER;
081  private static Configuration CONF;
082  private static Path DIR;
083
084  private static byte[] FAMILY = Bytes.toBytes("family");
085  private static byte[] ROW = Bytes.toBytes("row");
086  private static byte[] COL = Bytes.toBytes("col");
087
088  @Parameter
089  public String walProvider;
090
091  @Rule
092  public TestName name = new TestName();
093
094  @Parameters(name = "{index}: provider={0}")
095  public static Iterable<Object[]> data() {
096    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
097  }
098
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    CONF = TEST_UTIL.getConfiguration();
102    TEST_UTIL.startMiniDFSCluster(1);
103
104    CLUSTER = TEST_UTIL.getDFSCluster();
105    FS = CLUSTER.getFileSystem();
106    DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
107    CommonFSUtils.setRootDir(CONF, DIR);
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  @Before
116  public void setUp() {
117    CONF.set(WALFactory.WAL_PROVIDER, walProvider);
118  }
119
120  @After
121  public void tearDown() throws IOException {
122    FS.delete(DIR, true);
123  }
124
125  @Test
126  public void testDurability() throws Exception {
127    WALFactory wals = new WALFactory(CONF,
128      ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime())
129        .toString());
130    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
131    WAL wal = region.getWAL();
132    HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(),
133      "deferredRegion", wal, Durability.ASYNC_WAL);
134
135    region.put(newPut(null));
136    verifyWALCount(wals, wal, 1);
137
138    // a put through the deferred table does not write to the wal immediately,
139    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
140    // AsyncFlusher thread
141    deferredRegion.put(newPut(null));
142    // but will after we sync the wal
143    wal.sync();
144    verifyWALCount(wals, wal, 2);
145
146    // a put through a deferred table will be sync with the put sync'ed put
147    deferredRegion.put(newPut(null));
148    wal.sync();
149    verifyWALCount(wals, wal, 3);
150    region.put(newPut(null));
151    verifyWALCount(wals, wal, 4);
152
153    // a put through a deferred table will be sync with the put sync'ed put
154    deferredRegion.put(newPut(Durability.USE_DEFAULT));
155    wal.sync();
156    verifyWALCount(wals, wal, 5);
157    region.put(newPut(Durability.USE_DEFAULT));
158    verifyWALCount(wals, wal, 6);
159
160    // SKIP_WAL never writes to the wal
161    region.put(newPut(Durability.SKIP_WAL));
162    deferredRegion.put(newPut(Durability.SKIP_WAL));
163    verifyWALCount(wals, wal, 6);
164    wal.sync();
165    verifyWALCount(wals, wal, 6);
166
167    // Async overrides sync table default
168    region.put(newPut(Durability.ASYNC_WAL));
169    deferredRegion.put(newPut(Durability.ASYNC_WAL));
170    wal.sync();
171    verifyWALCount(wals, wal, 8);
172
173    // sync overrides async table default
174    region.put(newPut(Durability.SYNC_WAL));
175    deferredRegion.put(newPut(Durability.SYNC_WAL));
176    verifyWALCount(wals, wal, 10);
177
178    // fsync behaves like sync
179    region.put(newPut(Durability.FSYNC_WAL));
180    deferredRegion.put(newPut(Durability.FSYNC_WAL));
181    verifyWALCount(wals, wal, 12);
182  }
183
184  @Test
185  public void testIncrement() throws Exception {
186    byte[] row1 = Bytes.toBytes("row1");
187    byte[] col1 = Bytes.toBytes("col1");
188    byte[] col2 = Bytes.toBytes("col2");
189    byte[] col3 = Bytes.toBytes("col3");
190
191    // Setting up region
192    WALFactory wals = new WALFactory(CONF,
193      ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime())
194        .toString());
195    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
196    WAL wal = region.getWAL();
197
198    // col1: amount = 0, 1 write back to WAL
199    Increment inc1 = new Increment(row1);
200    inc1.addColumn(FAMILY, col1, 0);
201    Result res = region.increment(inc1);
202    assertEquals(1, res.size());
203    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1)));
204    verifyWALCount(wals, wal, 1);
205
206    // col1: amount = 1, 1 write back to WAL
207    inc1 = new Increment(row1);
208    inc1.addColumn(FAMILY, col1, 1);
209    res = region.increment(inc1);
210    assertEquals(1, res.size());
211    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
212    verifyWALCount(wals, wal, 2);
213
214    // col1: amount = 0, 1 write back to WAL
215    inc1 = new Increment(row1);
216    inc1.addColumn(FAMILY, col1, 0);
217    res = region.increment(inc1);
218    assertEquals(1, res.size());
219    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
220    verifyWALCount(wals, wal, 3);
221    // col1: amount = 0, col2: amount = 0, col3: amount = 0
222    // 1 write back to WAL
223    inc1 = new Increment(row1);
224    inc1.addColumn(FAMILY, col1, 0);
225    inc1.addColumn(FAMILY, col2, 0);
226    inc1.addColumn(FAMILY, col3, 0);
227    res = region.increment(inc1);
228    assertEquals(3, res.size());
229    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
230    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
231    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
232    verifyWALCount(wals, wal, 4);
233
234    // col1: amount = 5, col2: amount = 4, col3: amount = 3
235    // 1 write back to WAL
236    inc1 = new Increment(row1);
237    inc1.addColumn(FAMILY, col1, 5);
238    inc1.addColumn(FAMILY, col2, 4);
239    inc1.addColumn(FAMILY, col3, 3);
240    res = region.increment(inc1);
241    assertEquals(3, res.size());
242    assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
243    assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
244    assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
245    verifyWALCount(wals, wal, 5);
246  }
247
248  /**
249   * Test when returnResults set to false in increment it should not return the result instead it
250   * resturn null.
251   */
252  @Test
253  public void testIncrementWithReturnResultsSetToFalse() throws Exception {
254    byte[] row1 = Bytes.toBytes("row1");
255    byte[] col1 = Bytes.toBytes("col1");
256
257    // Setting up region
258    WALFactory wals = new WALFactory(CONF,
259      ServerName.valueOf("testIncrementWithReturnResultsSetToFalse",
260        16010, EnvironmentEdgeManager.currentTime())
261          .toString());
262    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
263
264    Increment inc1 = new Increment(row1);
265    inc1.setReturnResults(false);
266    inc1.addColumn(FAMILY, col1, 1);
267    Result res = region.increment(inc1);
268    assertTrue(res.isEmpty());
269  }
270
271  private Put newPut(Durability durability) {
272    Put p = new Put(ROW);
273    p.addColumn(FAMILY, COL, COL);
274    if (durability != null) {
275      p.setDurability(durability);
276    }
277    return p;
278  }
279
280  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
281    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
282    WAL.Reader reader = wals.createReader(FS, walPath);
283    int count = 0;
284    WAL.Entry entry = new WAL.Entry();
285    while (reader.next(entry) != null) {
286      count++;
287    }
288    reader.close();
289    assertEquals(expected, count);
290  }
291
292  // lifted from TestAtomicOperation
293  private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException {
294    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_"));
295    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
296        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
297    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
298    Path path = new Path(DIR, tableName.getNameAsString());
299    if (FS.exists(path)) {
300      if (!FS.delete(path, true)) {
301        throw new IOException("Failed delete of " + path);
302      }
303    }
304    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
305      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
306    return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info));
307  }
308
309  private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal,
310      Durability durability) throws IOException {
311    Path path = new Path(DIR, dir);
312    if (FS.exists(path)) {
313      if (!FS.delete(path, true)) {
314        throw new IOException("Failed delete of " + path);
315      }
316    }
317    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
318      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
319    return HRegion.createHRegion(info, path, CONF, td, wal);
320  }
321}