001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.hbase.wal.WAL;
051import org.apache.hadoop.hbase.wal.WALFactory;
052import org.apache.hadoop.hdfs.MiniDFSCluster;
053import org.junit.After;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameter;
065import org.junit.runners.Parameterized.Parameters;
066
067/**
068 * Tests for WAL write durability
069 */
070@RunWith(Parameterized.class)
071@Category({ RegionServerTests.class, MediumTests.class })
072public class TestDurability {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestDurability.class);
077
078  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079  private static FileSystem FS;
080  private static MiniDFSCluster CLUSTER;
081  private static Configuration CONF;
082  private static Path DIR;
083
084  private static byte[] FAMILY = Bytes.toBytes("family");
085  private static byte[] ROW = Bytes.toBytes("row");
086  private static byte[] COL = Bytes.toBytes("col");
087
088  @Parameter
089  public String walProvider;
090
091  @Rule
092  public TestName name = new TestName();
093
094  @Parameters(name = "{index}: provider={0}")
095  public static Iterable<Object[]> data() {
096    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
097  }
098
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    CONF = TEST_UTIL.getConfiguration();
102    TEST_UTIL.startMiniDFSCluster(1);
103
104    CLUSTER = TEST_UTIL.getDFSCluster();
105    FS = CLUSTER.getFileSystem();
106    DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
107    CommonFSUtils.setRootDir(CONF, DIR);
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  @Before
116  public void setUp() {
117    CONF.set(WALFactory.WAL_PROVIDER, walProvider);
118  }
119
120  @After
121  public void tearDown() throws IOException {
122    FS.delete(DIR, true);
123  }
124
125  @Test
126  public void testDurability() throws Exception {
127    WALFactory wals = new WALFactory(CONF,
128      ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString());
129    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
130    WAL wal = region.getWAL();
131    HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(),
132      "deferredRegion", wal, Durability.ASYNC_WAL);
133
134    region.put(newPut(null));
135    verifyWALCount(wals, wal, 1);
136
137    // a put through the deferred table does not write to the wal immediately,
138    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
139    // AsyncFlusher thread
140    deferredRegion.put(newPut(null));
141    // but will after we sync the wal
142    wal.sync();
143    verifyWALCount(wals, wal, 2);
144
145    // a put through a deferred table will be sync with the put sync'ed put
146    deferredRegion.put(newPut(null));
147    wal.sync();
148    verifyWALCount(wals, wal, 3);
149    region.put(newPut(null));
150    verifyWALCount(wals, wal, 4);
151
152    // a put through a deferred table will be sync with the put sync'ed put
153    deferredRegion.put(newPut(Durability.USE_DEFAULT));
154    wal.sync();
155    verifyWALCount(wals, wal, 5);
156    region.put(newPut(Durability.USE_DEFAULT));
157    verifyWALCount(wals, wal, 6);
158
159    // SKIP_WAL never writes to the wal
160    region.put(newPut(Durability.SKIP_WAL));
161    deferredRegion.put(newPut(Durability.SKIP_WAL));
162    verifyWALCount(wals, wal, 6);
163    wal.sync();
164    verifyWALCount(wals, wal, 6);
165
166    // Async overrides sync table default
167    region.put(newPut(Durability.ASYNC_WAL));
168    deferredRegion.put(newPut(Durability.ASYNC_WAL));
169    wal.sync();
170    verifyWALCount(wals, wal, 8);
171
172    // sync overrides async table default
173    region.put(newPut(Durability.SYNC_WAL));
174    deferredRegion.put(newPut(Durability.SYNC_WAL));
175    verifyWALCount(wals, wal, 10);
176
177    // fsync behaves like sync
178    region.put(newPut(Durability.FSYNC_WAL));
179    deferredRegion.put(newPut(Durability.FSYNC_WAL));
180    verifyWALCount(wals, wal, 12);
181  }
182
183  @Test
184  public void testIncrement() throws Exception {
185    byte[] row1 = Bytes.toBytes("row1");
186    byte[] col1 = Bytes.toBytes("col1");
187    byte[] col2 = Bytes.toBytes("col2");
188    byte[] col3 = Bytes.toBytes("col3");
189
190    // Setting up region
191    WALFactory wals = new WALFactory(CONF,
192      ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString());
193    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
194    WAL wal = region.getWAL();
195
196    // col1: amount = 0, 1 write back to WAL
197    Increment inc1 = new Increment(row1);
198    inc1.addColumn(FAMILY, col1, 0);
199    Result res = region.increment(inc1);
200    assertEquals(1, res.size());
201    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1)));
202    verifyWALCount(wals, wal, 1);
203
204    // col1: amount = 1, 1 write back to WAL
205    inc1 = new Increment(row1);
206    inc1.addColumn(FAMILY, col1, 1);
207    res = region.increment(inc1);
208    assertEquals(1, res.size());
209    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
210    verifyWALCount(wals, wal, 2);
211
212    // col1: amount = 0, 1 write back to WAL
213    inc1 = new Increment(row1);
214    inc1.addColumn(FAMILY, col1, 0);
215    res = region.increment(inc1);
216    assertEquals(1, res.size());
217    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
218    verifyWALCount(wals, wal, 3);
219    // col1: amount = 0, col2: amount = 0, col3: amount = 0
220    // 1 write back to WAL
221    inc1 = new Increment(row1);
222    inc1.addColumn(FAMILY, col1, 0);
223    inc1.addColumn(FAMILY, col2, 0);
224    inc1.addColumn(FAMILY, col3, 0);
225    res = region.increment(inc1);
226    assertEquals(3, res.size());
227    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
228    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
229    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
230    verifyWALCount(wals, wal, 4);
231
232    // col1: amount = 5, col2: amount = 4, col3: amount = 3
233    // 1 write back to WAL
234    inc1 = new Increment(row1);
235    inc1.addColumn(FAMILY, col1, 5);
236    inc1.addColumn(FAMILY, col2, 4);
237    inc1.addColumn(FAMILY, col3, 3);
238    res = region.increment(inc1);
239    assertEquals(3, res.size());
240    assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
241    assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
242    assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
243    verifyWALCount(wals, wal, 5);
244  }
245
246  /**
247   * Test when returnResults set to false in increment it should not return the result instead it
248   * resturn null.
249   */
250  @Test
251  public void testIncrementWithReturnResultsSetToFalse() throws Exception {
252    byte[] row1 = Bytes.toBytes("row1");
253    byte[] col1 = Bytes.toBytes("col1");
254
255    // Setting up region
256    WALFactory wals =
257      new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010,
258        EnvironmentEdgeManager.currentTime()).toString());
259    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
260
261    Increment inc1 = new Increment(row1);
262    inc1.setReturnResults(false);
263    inc1.addColumn(FAMILY, col1, 1);
264    Result res = region.increment(inc1);
265    assertTrue(res.isEmpty());
266  }
267
268  private Put newPut(Durability durability) {
269    Put p = new Put(ROW);
270    p.addColumn(FAMILY, COL, COL);
271    if (durability != null) {
272      p.setDurability(durability);
273    }
274    return p;
275  }
276
277  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
278    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
279    WAL.Reader reader = wals.createReader(FS, walPath);
280    int count = 0;
281    WAL.Entry entry = new WAL.Entry();
282    while (reader.next(entry) != null) {
283      count++;
284    }
285    reader.close();
286    assertEquals(expected, count);
287  }
288
289  // lifted from TestAtomicOperation
290  private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException {
291    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_"));
292    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
293      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
294    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
295    Path path = new Path(DIR, tableName.getNameAsString());
296    if (FS.exists(path)) {
297      if (!FS.delete(path, true)) {
298        throw new IOException("Failed delete of " + path);
299      }
300    }
301    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
302      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
303    return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info));
304  }
305
306  private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal,
307    Durability durability) throws IOException {
308    Path path = new Path(DIR, dir);
309    if (FS.exists(path)) {
310      if (!FS.delete(path, true)) {
311        throw new IOException("Failed delete of " + path);
312      }
313    }
314    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
315      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
316    return HRegion.createHRegion(info, path, CONF, td, wal);
317  }
318}