001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Durability; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.ChunkCreator; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.FSUtils; 048import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 049import org.apache.hadoop.hbase.wal.WAL; 050import org.apache.hadoop.hbase.wal.WALFactory; 051import org.apache.hadoop.hdfs.MiniDFSCluster; 052import org.junit.After; 053import org.junit.AfterClass; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.junit.runner.RunWith; 062import org.junit.runners.Parameterized; 063import org.junit.runners.Parameterized.Parameter; 064import org.junit.runners.Parameterized.Parameters; 065 066/** 067 * Tests for WAL write durability 068 */ 069@RunWith(Parameterized.class) 070@Category({ RegionServerTests.class, MediumTests.class }) 071public class TestDurability { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestDurability.class); 076 077 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 private static FileSystem FS; 079 private static MiniDFSCluster CLUSTER; 080 private static Configuration CONF; 081 private static Path DIR; 082 083 private static byte[] FAMILY = Bytes.toBytes("family"); 084 private static byte[] ROW = Bytes.toBytes("row"); 085 private static byte[] COL = Bytes.toBytes("col"); 086 087 @Parameter 088 public String walProvider; 089 090 @Rule 091 public TestName name = new TestName(); 092 093 @Parameters(name = "{index}: provider={0}") 094 public static Iterable<Object[]> data() { 095 return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); 096 } 097 098 @BeforeClass 099 public static void setUpBeforeClass() throws Exception { 100 CONF = TEST_UTIL.getConfiguration(); 101 TEST_UTIL.startMiniDFSCluster(1); 102 103 CLUSTER = TEST_UTIL.getDFSCluster(); 104 FS = CLUSTER.getFileSystem(); 105 DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability"); 106 FSUtils.setRootDir(CONF, DIR); 107 } 108 109 @AfterClass 110 public static void tearDownAfterClass() throws Exception { 111 TEST_UTIL.shutdownMiniCluster(); 112 } 113 114 @Before 115 public void setUp() { 116 CONF.set(WALFactory.WAL_PROVIDER, walProvider); 117 } 118 119 @After 120 public void tearDown() throws IOException { 121 FS.delete(DIR, true); 122 } 123 124 @Test 125 public void testDurability() throws Exception { 126 WALFactory wals = new WALFactory(CONF, 127 ServerName.valueOf("TestDurability", 16010, System.currentTimeMillis()).toString()); 128 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 129 WAL wal = region.getWAL(); 130 HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(), 131 "deferredRegion", wal, Durability.ASYNC_WAL); 132 133 region.put(newPut(null)); 134 verifyWALCount(wals, wal, 1); 135 136 // a put through the deferred table does not write to the wal immediately, 137 // but maybe has been successfully sync-ed by the underlying AsyncWriter + 138 // AsyncFlusher thread 139 deferredRegion.put(newPut(null)); 140 // but will after we sync the wal 141 wal.sync(); 142 verifyWALCount(wals, wal, 2); 143 144 // a put through a deferred table will be sync with the put sync'ed put 145 deferredRegion.put(newPut(null)); 146 wal.sync(); 147 verifyWALCount(wals, wal, 3); 148 region.put(newPut(null)); 149 verifyWALCount(wals, wal, 4); 150 151 // a put through a deferred table will be sync with the put sync'ed put 152 deferredRegion.put(newPut(Durability.USE_DEFAULT)); 153 wal.sync(); 154 verifyWALCount(wals, wal, 5); 155 region.put(newPut(Durability.USE_DEFAULT)); 156 verifyWALCount(wals, wal, 6); 157 158 // SKIP_WAL never writes to the wal 159 region.put(newPut(Durability.SKIP_WAL)); 160 deferredRegion.put(newPut(Durability.SKIP_WAL)); 161 verifyWALCount(wals, wal, 6); 162 wal.sync(); 163 verifyWALCount(wals, wal, 6); 164 165 // Async overrides sync table default 166 region.put(newPut(Durability.ASYNC_WAL)); 167 deferredRegion.put(newPut(Durability.ASYNC_WAL)); 168 wal.sync(); 169 verifyWALCount(wals, wal, 8); 170 171 // sync overrides async table default 172 region.put(newPut(Durability.SYNC_WAL)); 173 deferredRegion.put(newPut(Durability.SYNC_WAL)); 174 verifyWALCount(wals, wal, 10); 175 176 // fsync behaves like sync 177 region.put(newPut(Durability.FSYNC_WAL)); 178 deferredRegion.put(newPut(Durability.FSYNC_WAL)); 179 verifyWALCount(wals, wal, 12); 180 } 181 182 @Test 183 public void testIncrement() throws Exception { 184 byte[] row1 = Bytes.toBytes("row1"); 185 byte[] col1 = Bytes.toBytes("col1"); 186 byte[] col2 = Bytes.toBytes("col2"); 187 byte[] col3 = Bytes.toBytes("col3"); 188 189 // Setting up region 190 WALFactory wals = new WALFactory(CONF, 191 ServerName.valueOf("TestIncrement", 16010, System.currentTimeMillis()).toString()); 192 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 193 WAL wal = region.getWAL(); 194 195 // col1: amount = 0, 1 write back to WAL 196 Increment inc1 = new Increment(row1); 197 inc1.addColumn(FAMILY, col1, 0); 198 Result res = region.increment(inc1); 199 assertEquals(1, res.size()); 200 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1))); 201 verifyWALCount(wals, wal, 1); 202 203 // col1: amount = 1, 1 write back to WAL 204 inc1 = new Increment(row1); 205 inc1.addColumn(FAMILY, col1, 1); 206 res = region.increment(inc1); 207 assertEquals(1, res.size()); 208 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 209 verifyWALCount(wals, wal, 2); 210 211 // col1: amount = 0, 1 write back to WAL 212 inc1 = new Increment(row1); 213 inc1.addColumn(FAMILY, col1, 0); 214 res = region.increment(inc1); 215 assertEquals(1, res.size()); 216 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 217 verifyWALCount(wals, wal, 3); 218 // col1: amount = 0, col2: amount = 0, col3: amount = 0 219 // 1 write back to WAL 220 inc1 = new Increment(row1); 221 inc1.addColumn(FAMILY, col1, 0); 222 inc1.addColumn(FAMILY, col2, 0); 223 inc1.addColumn(FAMILY, col3, 0); 224 res = region.increment(inc1); 225 assertEquals(3, res.size()); 226 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 227 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2))); 228 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3))); 229 verifyWALCount(wals, wal, 4); 230 231 // col1: amount = 5, col2: amount = 4, col3: amount = 3 232 // 1 write back to WAL 233 inc1 = new Increment(row1); 234 inc1.addColumn(FAMILY, col1, 5); 235 inc1.addColumn(FAMILY, col2, 4); 236 inc1.addColumn(FAMILY, col3, 3); 237 res = region.increment(inc1); 238 assertEquals(3, res.size()); 239 assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1))); 240 assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2))); 241 assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3))); 242 verifyWALCount(wals, wal, 5); 243 } 244 245 /** 246 * Test when returnResults set to false in increment it should not return the result instead it 247 * resturn null. 248 */ 249 @Test 250 public void testIncrementWithReturnResultsSetToFalse() throws Exception { 251 byte[] row1 = Bytes.toBytes("row1"); 252 byte[] col1 = Bytes.toBytes("col1"); 253 254 // Setting up region 255 WALFactory wals = new WALFactory(CONF, 256 ServerName 257 .valueOf("testIncrementWithReturnResultsSetToFalse", 16010, System.currentTimeMillis()) 258 .toString()); 259 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 260 261 Increment inc1 = new Increment(row1); 262 inc1.setReturnResults(false); 263 inc1.addColumn(FAMILY, col1, 1); 264 Result res = region.increment(inc1); 265 assertTrue(res.isEmpty()); 266 } 267 268 private Put newPut(Durability durability) { 269 Put p = new Put(ROW); 270 p.addColumn(FAMILY, COL, COL); 271 if (durability != null) { 272 p.setDurability(durability); 273 } 274 return p; 275 } 276 277 private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { 278 Path walPath = AbstractFSWALProvider.getCurrentFileName(log); 279 WAL.Reader reader = wals.createReader(FS, walPath); 280 int count = 0; 281 WAL.Entry entry = new WAL.Entry(); 282 while (reader.next(entry) != null) { 283 count++; 284 } 285 reader.close(); 286 assertEquals(expected, count); 287 } 288 289 // lifted from TestAtomicOperation 290 private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException { 291 TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^A-Za-z0-9-_]", "_")); 292 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 293 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 294 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 295 Path path = new Path(DIR, tableName.getNameAsString()); 296 if (FS.exists(path)) { 297 if (!FS.delete(path, true)) { 298 throw new IOException("Failed delete of " + path); 299 } 300 } 301 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 302 return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info)); 303 } 304 305 private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal, 306 Durability durability) throws IOException { 307 Path path = new Path(DIR, dir); 308 if (FS.exists(path)) { 309 if (!FS.delete(path, true)) { 310 throw new IOException("Failed delete of " + path); 311 } 312 } 313 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 314 return HRegion.createHRegion(info, path, CONF, td, wal); 315 } 316}