001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.stream.Stream;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Durability;
034import org.apache.hadoop.hbase.client.Increment;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.regionserver.ChunkCreator;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
050import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALFactory;
053import org.apache.hadoop.hdfs.MiniDFSCluster;
054import org.junit.jupiter.api.AfterAll;
055import org.junit.jupiter.api.AfterEach;
056import org.junit.jupiter.api.BeforeAll;
057import org.junit.jupiter.api.BeforeEach;
058import org.junit.jupiter.api.Tag;
059import org.junit.jupiter.api.TestInfo;
060import org.junit.jupiter.api.TestTemplate;
061import org.junit.jupiter.params.provider.Arguments;
062
063/**
064 * Tests for WAL write durability
065 */
066@Tag(RegionServerTests.TAG)
067@Tag(MediumTests.TAG)
068@HBaseParameterizedTestTemplate(name = "{index}: provider={0}")
069public class TestDurability {
070
071  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
072  private static FileSystem FS;
073  private static MiniDFSCluster CLUSTER;
074  private static Configuration CONF;
075  private static Path DIR;
076
077  private static byte[] FAMILY = Bytes.toBytes("family");
078  private static byte[] ROW = Bytes.toBytes("row");
079  private static byte[] COL = Bytes.toBytes("col");
080
081  private final String walProvider;
082
083  private String name;
084
085  public static Stream<Arguments> parameters() {
086    return Stream.of(Arguments.of("defaultProvider"), Arguments.of("asyncfs"));
087  }
088
089  public TestDurability(String walProvider) {
090    this.walProvider = walProvider;
091  }
092
093  @BeforeAll
094  public static void setUpBeforeClass() throws Exception {
095    CONF = TEST_UTIL.getConfiguration();
096    TEST_UTIL.startMiniDFSCluster(1);
097
098    CLUSTER = TEST_UTIL.getDFSCluster();
099    FS = CLUSTER.getFileSystem();
100    DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
101    CommonFSUtils.setRootDir(CONF, DIR);
102  }
103
104  @AfterAll
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL.shutdownMiniCluster();
107  }
108
109  @BeforeEach
110  public void setUp(TestInfo testInfo) {
111    name = testInfo.getTestMethod().get().getName();
112    CONF.set(WALFactory.WAL_PROVIDER, walProvider);
113  }
114
115  @AfterEach
116  public void tearDown() throws IOException {
117    FS.delete(DIR, true);
118  }
119
120  @TestTemplate
121  public void testDurability() throws Exception {
122    WALFactory wals = new WALFactory(CONF,
123      ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString());
124    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
125    WAL wal = region.getWAL();
126    HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(),
127      "deferredRegion", wal, Durability.ASYNC_WAL);
128
129    region.put(newPut(null));
130    verifyWALCount(wals, wal, 1);
131
132    // a put through the deferred table does not write to the wal immediately,
133    // but maybe has been successfully sync-ed by the underlying AsyncWriter +
134    // AsyncFlusher thread
135    deferredRegion.put(newPut(null));
136    // but will after we sync the wal
137    wal.sync();
138    verifyWALCount(wals, wal, 2);
139
140    // a put through a deferred table will be sync with the put sync'ed put
141    deferredRegion.put(newPut(null));
142    wal.sync();
143    verifyWALCount(wals, wal, 3);
144    region.put(newPut(null));
145    verifyWALCount(wals, wal, 4);
146
147    // a put through a deferred table will be sync with the put sync'ed put
148    deferredRegion.put(newPut(Durability.USE_DEFAULT));
149    wal.sync();
150    verifyWALCount(wals, wal, 5);
151    region.put(newPut(Durability.USE_DEFAULT));
152    verifyWALCount(wals, wal, 6);
153
154    // SKIP_WAL never writes to the wal
155    region.put(newPut(Durability.SKIP_WAL));
156    deferredRegion.put(newPut(Durability.SKIP_WAL));
157    verifyWALCount(wals, wal, 6);
158    wal.sync();
159    verifyWALCount(wals, wal, 6);
160
161    // Async overrides sync table default
162    region.put(newPut(Durability.ASYNC_WAL));
163    deferredRegion.put(newPut(Durability.ASYNC_WAL));
164    wal.sync();
165    verifyWALCount(wals, wal, 8);
166
167    // sync overrides async table default
168    region.put(newPut(Durability.SYNC_WAL));
169    deferredRegion.put(newPut(Durability.SYNC_WAL));
170    verifyWALCount(wals, wal, 10);
171
172    // fsync behaves like sync
173    region.put(newPut(Durability.FSYNC_WAL));
174    deferredRegion.put(newPut(Durability.FSYNC_WAL));
175    verifyWALCount(wals, wal, 12);
176  }
177
178  @TestTemplate
179  public void testIncrement() throws Exception {
180    byte[] row1 = Bytes.toBytes("row1");
181    byte[] col1 = Bytes.toBytes("col1");
182    byte[] col2 = Bytes.toBytes("col2");
183    byte[] col3 = Bytes.toBytes("col3");
184
185    // Setting up region
186    WALFactory wals = new WALFactory(CONF,
187      ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString());
188    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
189    WAL wal = region.getWAL();
190
191    // col1: amount = 0, 1 write back to WAL
192    Increment inc1 = new Increment(row1);
193    inc1.addColumn(FAMILY, col1, 0);
194    Result res = region.increment(inc1);
195    assertEquals(1, res.size());
196    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1)));
197    verifyWALCount(wals, wal, 1);
198
199    // col1: amount = 1, 1 write back to WAL
200    inc1 = new Increment(row1);
201    inc1.addColumn(FAMILY, col1, 1);
202    res = region.increment(inc1);
203    assertEquals(1, res.size());
204    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
205    verifyWALCount(wals, wal, 2);
206
207    // col1: amount = 0, 1 write back to WAL
208    inc1 = new Increment(row1);
209    inc1.addColumn(FAMILY, col1, 0);
210    res = region.increment(inc1);
211    assertEquals(1, res.size());
212    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
213    verifyWALCount(wals, wal, 3);
214    // col1: amount = 0, col2: amount = 0, col3: amount = 0
215    // 1 write back to WAL
216    inc1 = new Increment(row1);
217    inc1.addColumn(FAMILY, col1, 0);
218    inc1.addColumn(FAMILY, col2, 0);
219    inc1.addColumn(FAMILY, col3, 0);
220    res = region.increment(inc1);
221    assertEquals(3, res.size());
222    assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
223    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
224    assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
225    verifyWALCount(wals, wal, 4);
226
227    // col1: amount = 5, col2: amount = 4, col3: amount = 3
228    // 1 write back to WAL
229    inc1 = new Increment(row1);
230    inc1.addColumn(FAMILY, col1, 5);
231    inc1.addColumn(FAMILY, col2, 4);
232    inc1.addColumn(FAMILY, col3, 3);
233    res = region.increment(inc1);
234    assertEquals(3, res.size());
235    assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
236    assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
237    assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
238    verifyWALCount(wals, wal, 5);
239  }
240
241  /**
242   * Test when returnResults set to false in increment it should not return the result instead it
243   * resturn null.
244   */
245  @TestTemplate
246  public void testIncrementWithReturnResultsSetToFalse() throws Exception {
247    byte[] row1 = Bytes.toBytes("row1");
248    byte[] col1 = Bytes.toBytes("col1");
249
250    // Setting up region
251    WALFactory wals =
252      new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010,
253        EnvironmentEdgeManager.currentTime()).toString());
254    HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
255
256    Increment inc1 = new Increment(row1);
257    inc1.setReturnResults(false);
258    inc1.addColumn(FAMILY, col1, 1);
259    Result res = region.increment(inc1);
260    assertTrue(res.isEmpty());
261  }
262
263  private Put newPut(Durability durability) {
264    Put p = new Put(ROW);
265    p.addColumn(FAMILY, COL, COL);
266    if (durability != null) {
267      p.setDurability(durability);
268    }
269    return p;
270  }
271
272  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
273    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
274    assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath));
275  }
276
277  // lifted from TestAtomicOperation
278  private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException {
279    TableName tableName = TableName.valueOf(name.replaceAll("[^A-Za-z0-9-_]", "_"));
280    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
281      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
282    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
283    Path path = new Path(DIR, tableName.getNameAsString());
284    if (FS.exists(path)) {
285      if (!FS.delete(path, true)) {
286        throw new IOException("Failed delete of " + path);
287      }
288    }
289    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
290      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
291    return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info));
292  }
293
294  private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal,
295    Durability durability) throws IOException {
296    Path path = new Path(DIR, dir);
297    if (FS.exists(path)) {
298      if (!FS.delete(path, true)) {
299        throw new IOException("Failed delete of " + path);
300      }
301    }
302    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
303      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
304    return HRegion.createHRegion(info, path, CONF, td, wal);
305  }
306}