001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.stream.Stream; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Durability; 034import org.apache.hadoop.hbase.client.Increment; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.regionserver.ChunkCreator; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 050import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.apache.hadoop.hbase.wal.WALFactory; 053import org.apache.hadoop.hdfs.MiniDFSCluster; 054import org.junit.jupiter.api.AfterAll; 055import org.junit.jupiter.api.AfterEach; 056import org.junit.jupiter.api.BeforeAll; 057import org.junit.jupiter.api.BeforeEach; 058import org.junit.jupiter.api.Tag; 059import org.junit.jupiter.api.TestInfo; 060import org.junit.jupiter.api.TestTemplate; 061import org.junit.jupiter.params.provider.Arguments; 062 063/** 064 * Tests for WAL write durability 065 */ 066@Tag(RegionServerTests.TAG) 067@Tag(MediumTests.TAG) 068@HBaseParameterizedTestTemplate(name = "{index}: provider={0}") 069public class TestDurability { 070 071 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 072 private static FileSystem FS; 073 private static MiniDFSCluster CLUSTER; 074 private static Configuration CONF; 075 private static Path DIR; 076 077 private static byte[] FAMILY = Bytes.toBytes("family"); 078 private static byte[] ROW = Bytes.toBytes("row"); 079 private static byte[] COL = Bytes.toBytes("col"); 080 081 private final String walProvider; 082 083 private String name; 084 085 public static Stream<Arguments> parameters() { 086 return Stream.of(Arguments.of("defaultProvider"), Arguments.of("asyncfs")); 087 } 088 089 public TestDurability(String walProvider) { 090 this.walProvider = walProvider; 091 } 092 093 @BeforeAll 094 public static void setUpBeforeClass() throws Exception { 095 CONF = TEST_UTIL.getConfiguration(); 096 TEST_UTIL.startMiniDFSCluster(1); 097 098 CLUSTER = TEST_UTIL.getDFSCluster(); 099 FS = CLUSTER.getFileSystem(); 100 DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability"); 101 CommonFSUtils.setRootDir(CONF, DIR); 102 } 103 104 @AfterAll 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniCluster(); 107 } 108 109 @BeforeEach 110 public void setUp(TestInfo testInfo) { 111 name = testInfo.getTestMethod().get().getName(); 112 CONF.set(WALFactory.WAL_PROVIDER, walProvider); 113 } 114 115 @AfterEach 116 public void tearDown() throws IOException { 117 FS.delete(DIR, true); 118 } 119 120 @TestTemplate 121 public void testDurability() throws Exception { 122 WALFactory wals = new WALFactory(CONF, 123 ServerName.valueOf("TestDurability", 16010, EnvironmentEdgeManager.currentTime()).toString()); 124 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 125 WAL wal = region.getWAL(); 126 HRegion deferredRegion = createHRegion(region.getTableDescriptor(), region.getRegionInfo(), 127 "deferredRegion", wal, Durability.ASYNC_WAL); 128 129 region.put(newPut(null)); 130 verifyWALCount(wals, wal, 1); 131 132 // a put through the deferred table does not write to the wal immediately, 133 // but maybe has been successfully sync-ed by the underlying AsyncWriter + 134 // AsyncFlusher thread 135 deferredRegion.put(newPut(null)); 136 // but will after we sync the wal 137 wal.sync(); 138 verifyWALCount(wals, wal, 2); 139 140 // a put through a deferred table will be sync with the put sync'ed put 141 deferredRegion.put(newPut(null)); 142 wal.sync(); 143 verifyWALCount(wals, wal, 3); 144 region.put(newPut(null)); 145 verifyWALCount(wals, wal, 4); 146 147 // a put through a deferred table will be sync with the put sync'ed put 148 deferredRegion.put(newPut(Durability.USE_DEFAULT)); 149 wal.sync(); 150 verifyWALCount(wals, wal, 5); 151 region.put(newPut(Durability.USE_DEFAULT)); 152 verifyWALCount(wals, wal, 6); 153 154 // SKIP_WAL never writes to the wal 155 region.put(newPut(Durability.SKIP_WAL)); 156 deferredRegion.put(newPut(Durability.SKIP_WAL)); 157 verifyWALCount(wals, wal, 6); 158 wal.sync(); 159 verifyWALCount(wals, wal, 6); 160 161 // Async overrides sync table default 162 region.put(newPut(Durability.ASYNC_WAL)); 163 deferredRegion.put(newPut(Durability.ASYNC_WAL)); 164 wal.sync(); 165 verifyWALCount(wals, wal, 8); 166 167 // sync overrides async table default 168 region.put(newPut(Durability.SYNC_WAL)); 169 deferredRegion.put(newPut(Durability.SYNC_WAL)); 170 verifyWALCount(wals, wal, 10); 171 172 // fsync behaves like sync 173 region.put(newPut(Durability.FSYNC_WAL)); 174 deferredRegion.put(newPut(Durability.FSYNC_WAL)); 175 verifyWALCount(wals, wal, 12); 176 } 177 178 @TestTemplate 179 public void testIncrement() throws Exception { 180 byte[] row1 = Bytes.toBytes("row1"); 181 byte[] col1 = Bytes.toBytes("col1"); 182 byte[] col2 = Bytes.toBytes("col2"); 183 byte[] col3 = Bytes.toBytes("col3"); 184 185 // Setting up region 186 WALFactory wals = new WALFactory(CONF, 187 ServerName.valueOf("TestIncrement", 16010, EnvironmentEdgeManager.currentTime()).toString()); 188 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 189 WAL wal = region.getWAL(); 190 191 // col1: amount = 0, 1 write back to WAL 192 Increment inc1 = new Increment(row1); 193 inc1.addColumn(FAMILY, col1, 0); 194 Result res = region.increment(inc1); 195 assertEquals(1, res.size()); 196 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col1))); 197 verifyWALCount(wals, wal, 1); 198 199 // col1: amount = 1, 1 write back to WAL 200 inc1 = new Increment(row1); 201 inc1.addColumn(FAMILY, col1, 1); 202 res = region.increment(inc1); 203 assertEquals(1, res.size()); 204 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 205 verifyWALCount(wals, wal, 2); 206 207 // col1: amount = 0, 1 write back to WAL 208 inc1 = new Increment(row1); 209 inc1.addColumn(FAMILY, col1, 0); 210 res = region.increment(inc1); 211 assertEquals(1, res.size()); 212 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 213 verifyWALCount(wals, wal, 3); 214 // col1: amount = 0, col2: amount = 0, col3: amount = 0 215 // 1 write back to WAL 216 inc1 = new Increment(row1); 217 inc1.addColumn(FAMILY, col1, 0); 218 inc1.addColumn(FAMILY, col2, 0); 219 inc1.addColumn(FAMILY, col3, 0); 220 res = region.increment(inc1); 221 assertEquals(3, res.size()); 222 assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1))); 223 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2))); 224 assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3))); 225 verifyWALCount(wals, wal, 4); 226 227 // col1: amount = 5, col2: amount = 4, col3: amount = 3 228 // 1 write back to WAL 229 inc1 = new Increment(row1); 230 inc1.addColumn(FAMILY, col1, 5); 231 inc1.addColumn(FAMILY, col2, 4); 232 inc1.addColumn(FAMILY, col3, 3); 233 res = region.increment(inc1); 234 assertEquals(3, res.size()); 235 assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1))); 236 assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2))); 237 assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3))); 238 verifyWALCount(wals, wal, 5); 239 } 240 241 /** 242 * Test when returnResults set to false in increment it should not return the result instead it 243 * resturn null. 244 */ 245 @TestTemplate 246 public void testIncrementWithReturnResultsSetToFalse() throws Exception { 247 byte[] row1 = Bytes.toBytes("row1"); 248 byte[] col1 = Bytes.toBytes("col1"); 249 250 // Setting up region 251 WALFactory wals = 252 new WALFactory(CONF, ServerName.valueOf("testIncrementWithReturnResultsSetToFalse", 16010, 253 EnvironmentEdgeManager.currentTime()).toString()); 254 HRegion region = createHRegion(wals, Durability.USE_DEFAULT); 255 256 Increment inc1 = new Increment(row1); 257 inc1.setReturnResults(false); 258 inc1.addColumn(FAMILY, col1, 1); 259 Result res = region.increment(inc1); 260 assertTrue(res.isEmpty()); 261 } 262 263 private Put newPut(Durability durability) { 264 Put p = new Put(ROW); 265 p.addColumn(FAMILY, COL, COL); 266 if (durability != null) { 267 p.setDurability(durability); 268 } 269 return p; 270 } 271 272 private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { 273 Path walPath = AbstractFSWALProvider.getCurrentFileName(log); 274 assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath)); 275 } 276 277 // lifted from TestAtomicOperation 278 private HRegion createHRegion(WALFactory wals, Durability durability) throws IOException { 279 TableName tableName = TableName.valueOf(name.replaceAll("[^A-Za-z0-9-_]", "_")); 280 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 281 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 282 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 283 Path path = new Path(DIR, tableName.getNameAsString()); 284 if (FS.exists(path)) { 285 if (!FS.delete(path, true)) { 286 throw new IOException("Failed delete of " + path); 287 } 288 } 289 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 290 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 291 return HRegion.createHRegion(info, path, CONF, htd, wals.getWAL(info)); 292 } 293 294 private HRegion createHRegion(TableDescriptor td, RegionInfo info, String dir, WAL wal, 295 Durability durability) throws IOException { 296 Path path = new Path(DIR, dir); 297 if (FS.exists(path)) { 298 if (!FS.delete(path, true)) { 299 throw new IOException("Failed delete of " + path); 300 } 301 } 302 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 303 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 304 return HRegion.createHRegion(info, path, CONF, td, wal); 305 } 306}