001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Durability; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.ChunkCreator; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.apache.hadoop.hbase.wal.WAL; 051import org.apache.hadoop.hbase.wal.WALFactory; 052import org.apache.hadoop.hdfs.MiniDFSCluster; 053import org.junit.After; 054import org.junit.AfterClass; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064import org.junit.runners.Parameterized.Parameter; 065import org.junit.runners.Parameterized.Parameters; 066 067/** 068 * Tests for WAL write durability 069 */ 070@RunWith(Parameterized.class) 071@Category({ RegionServerTests.class, MediumTests.class }) 072public class TestDurability { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestDurability.class); 077 078 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 079 private static FileSystem FS; 080 private static MiniDFSCluster CLUSTER; 081 private static Configuration CONF; 082 private static Path DIR; 083 084 private static byte[] FAMILY = Bytes.toBytes("family"); 085 private static byte[] ROW = Bytes.toBytes("row"); 086 private static byte[] COL = Bytes.toBytes("col"); 087 088 @Parameter 089 public String walProvider; 090 091 @Rule 092 public TestName name = new TestName(); 093 094 @Parameters(name = "{index}: provider={0}") 095 public static Iterable<Object[]> data() { 096 return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); 097 } 098 099 @BeforeClass 100 public static void setUpBeforeClass() throws Exception { 101 CONF = TEST_UTIL.getConfiguration(); 102 TEST_UTIL.startMiniDFSCluster(1); 103 104 CLUSTER = TEST_UTIL.getDFSCluster(); 105 FS = CLUSTER.getFileSystem(); 106 DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability"); 107 CommonFSUtils.setRootDir(CONF, DIR); 108 } 109 110 @AfterClass 111 public static void tearDownAfterClass() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 @Before 116 public void setUp() { 117 CONF.set(WALFactory.WAL_PROVIDER, walProvider); 118 } 119 120 @After 121 public void tearDown() throws IOException { 122 FS.delete(DIR, true); 123 } 124 125 @Test 126 public void testDurability() throws Exception { 127 WALFactory wals = new WALFactory(CONF, 128 ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString()); 129 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 130 WAL wal = region.getWAL(); 131 HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(), 132 "deferredRegion", wal, Durability.ASYNC_WAL); 133 134 region.put(newPut(null)); 135 verifyWALCount(wals, wal, 1); 136 137 // a put through the deferred table does not write to the wal immediately, 138 // but maybe has been successfully sync-ed by the underlying AsyncWriter + 139 // AsyncFlusher thread 140 deferredRegion.put(newPut(null)); 141 // but will after we sync the wal 142 wal.sync(); 143 verifyWALCount(wals, wal, 2); 144 145 // a put through a deferred table will be sync with the put sync'ed put 146 deferredRegion.put(newPut(null)); 147 wal.sync(); 148 verifyWALCount(wals, wal, 3); 149 region.put(newPut(null)); 150 verifyWALCount(wals, wal, 4); 151 152 // a put through a deferred table will be sync with the put sync'ed put 153 deferredRegion.put(newPut(Durability.USE_DEFAULT)); 154 wal.sync(); 155 verifyWALCount(wals, wal, 5); 156 region.put(newPut(Durability.USE_DEFAULT)); 157 verifyWALCount(wals, wal, 6); 158 159 // SKIP_WAL never writes to the wal 160 region.put(newPut(Durability.SKIP_WAL)); 161 deferredRegion.put(newPut(Durability.SKIP_WAL)); 162 verifyWALCount(wals, wal, 6); 163 wal.sync(); 164 verifyWALCount(wals, wal, 6); 165 166 // Async overrides sync table default 167 region.put(newPut(Durability.ASYNC_WAL)); 168 deferredRegion.put(newPut(Durability.ASYNC_WAL)); 169 wal.sync(); 170 verifyWALCount(wals, wal, 8); 171 172 // sync overrides async table default 173 region.put(newPut(Durability.SYNC_WAL)); 174 deferredRegion.put(newPut(Durability.SYNC_WAL)); 175 verifyWALCount(wals, wal, 10); 176 177 // fsync behaves like sync 178 region.put(newPut(Durability.FSYNC_WAL)); 179 deferredRegion.put(newPut(Durability.FSYNC_WAL)); 180 verifyWALCount(wals, wal, 12); 181 } 182 183 @Test 184 public void testIncrement() throws Exception { 185 byte[] row1 = Bytes.toBytes("row1"); 186 byte[] col1 = Bytes.toBytes("col1"); 187 byte[] col2 = Bytes.toBytes("col2"); 188 byte[] col3 = Bytes.toBytes("col3"); 189 190 // Setting up region 191 WALFactory wals = new WALFactory(CONF, 192 ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString()); 193 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 194 WAL wal = region.getWAL(); 195 196 // col1: amount = 0, 1 write back to WAL 197 Increment inc1 = new Increment(row1); 198 inc1.addColumn(FAMILY, col1, 0); 199 Result res = region.increment(inc1); 200 assertEquals(1, res.size()); 201 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1))); 202 verifyWALCount(wals, wal, 1); 203 204 // col1: amount = 1, 1 write back to WAL 205 inc1 = new Increment(row1); 206 inc1.addColumn(FAMILY, col1, 1); 207 res = region.increment(inc1); 208 assertEquals(1, res.size()); 209 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 210 verifyWALCount(wals, wal, 2); 211 212 // col1: amount = 0, 1 write back to WAL 213 inc1 = new Increment(row1); 214 inc1.addColumn(FAMILY, col1, 0); 215 res = region.increment(inc1); 216 assertEquals(1, res.size()); 217 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 218 verifyWALCount(wals, wal, 3); 219 // col1: amount = 0, col2: amount = 0, col3: amount = 0 220 // 1 write back to WAL 221 inc1 = new Increment(row1); 222 inc1.addColumn(FAMILY, col1, 0); 223 inc1.addColumn(FAMILY, col2, 0); 224 inc1.addColumn(FAMILY, col3, 0); 225 res = region.increment(inc1); 226 assertEquals(3, res.size()); 227 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 228 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2))); 229 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3))); 230 verifyWALCount(wals, wal, 4); 231 232 // col1: amount = 5, col2: amount = 4, col3: amount = 3 233 // 1 write back to WAL 234 inc1 = new Increment(row1); 235 inc1.addColumn(FAMILY, col1, 5); 236 inc1.addColumn(FAMILY, col2, 4); 237 inc1.addColumn(FAMILY, col3, 3); 238 res = region.increment(inc1); 239 assertEquals(3, res.size()); 240 assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1))); 241 assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2))); 242 assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3))); 243 verifyWALCount(wals, wal, 5); 244 } 245 246 /** 247 * Test when returnResults set to false in increment it should not return the result instead it 248 * resturn null. 249 */ 250 @Test 251 public void testIncrementWithReturnResultsSetToFalse() throws Exception { 252 byte[] row1 = Bytes.toBytes("row1"); 253 byte[] col1 = Bytes.toBytes("col1"); 254 255 // Setting up region 256 WALFactory wals = 257 new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010, 258 EnvironmentEdgeManager.currentTime()).toString()); 259 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 260 261 Increment inc1 = new Increment(row1); 262 inc1.setReturnResults(false); 263 inc1.addColumn(FAMILY, col1, 1); 264 Result res = region.increment(inc1); 265 assertTrue(res.isEmpty()); 266 } 267 268 private Put newPut(Durability durability) { 269 Put p = new Put(ROW); 270 p.addColumn(FAMILY, COL, COL); 271 if (durability != null) { 272 p.setDurability(durability); 273 } 274 return p; 275 } 276 277 private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { 278 Path walPath = AbstractFSWALProvider.getCurrentFileName(log); 279 WAL.Reader reader = wals.createReader(FS, walPath); 280 int count = 0; 281 WAL.Entry entry = new WAL.Entry(); 282 while (reader.next(entry) != null) { 283 count++; 284 } 285 reader.close(); 286 assertEquals(expected, count); 287 } 288 289 // lifted from TestAtomicOperation 290 private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException { 291 TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_")); 292 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 293 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 294 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 295 Path path = new Path(DIR, tableName.getNameAsString()); 296 if (FS.exists(path)) { 297 if (!FS.delete(path, true)) { 298 throw new IOException("Failed delete of " + path); 299 } 300 } 301 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 302 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 303 return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info)); 304 } 305 306 private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal, 307 Durability durability) throws IOException { 308 Path path = new Path(DIR, dir); 309 if (FS.exists(path)) { 310 if (!FS.delete(path, true)) { 311 throw new IOException("Failed delete of " + path); 312 } 313 } 314 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 315 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 316 return HRegion.createHRegion(info, path, CONF, td, wal); 317 } 318}