001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.lang.reflect.Field; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.TreeMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 047import org.apache.hadoop.hbase.regionserver.ChunkCreator; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 050import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.hadoop.hbase.wal.WAL; 057import org.apache.hadoop.hbase.wal.WALEdit; 058import org.apache.hadoop.hbase.wal.WALKey; 059import org.apache.hadoop.hbase.wal.WALProvider; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065 066/** 067 * Provides FSHLog test cases. 068 */ 069@Category({ RegionServerTests.class, MediumTests.class }) 070public class TestFSHLog extends AbstractTestFSWAL { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class); 074 075 private static final long TEST_TIMEOUT_MS = 10000; 076 077 @Rule 078 public TestName name = new TestName(); 079 080 @Override 081 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 082 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 083 String suffix) throws IOException { 084 FSHLog fshLog = 085 new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 086 fshLog.init(); 087 return fshLog; 088 } 089 090 @Override 091 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 092 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 093 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 094 throws IOException { 095 FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, 096 prefix, suffix) { 097 098 @Override 099 protected void atHeadOfRingBufferEventHandlerAppend() { 100 action.run(); 101 super.atHeadOfRingBufferEventHandlerAppend(); 102 } 103 }; 104 fshLog.init(); 105 return fshLog; 106 } 107 108 @Test 109 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 110 SecurityException, IllegalArgumentException, IllegalAccessException { 111 final String name = this.name.getMethodName(); 112 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 113 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 114 log.init(); 115 try { 116 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 117 ringBufferEventHandlerField.setAccessible(true); 118 FSHLog.RingBufferEventHandler ringBufferEventHandler = 119 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 120 Field syncRunnerIndexField = 121 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex"); 122 syncRunnerIndexField.setAccessible(true); 123 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); 124 TableDescriptor htd = 125 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 126 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 127 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 128 for (byte[] fam : htd.getColumnFamilyNames()) { 129 scopes.put(fam, 0); 130 } 131 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 132 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 133 for (int i = 0; i < 10; i++) { 134 addEdits(log, hri, htd, 1, mvcc, scopes, "row"); 135 } 136 } finally { 137 log.close(); 138 } 139 } 140 141 /** 142 * Test for WAL stall due to sync future overwrites. See HBASE-25984. 143 */ 144 @Test 145 public void testDeadlockWithSyncOverwrites() throws Exception { 146 final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1); 147 148 class FailingWriter implements WALProvider.Writer { 149 @Override 150 public void sync(boolean forceSync) throws IOException { 151 throw new IOException("Injected failure.."); 152 } 153 154 @Override 155 public void append(WAL.Entry entry) throws IOException { 156 } 157 158 @Override 159 public long getLength() { 160 return 0; 161 } 162 163 @Override 164 public long getSyncedLength() { 165 return 0; 166 } 167 168 @Override 169 public void close() throws IOException { 170 } 171 } 172 173 /* 174 * Custom FSHLog implementation with a conditional wait before attaining safe point. 175 */ 176 class CustomFSHLog extends FSHLog { 177 public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir, 178 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 179 String prefix, String suffix) throws IOException { 180 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 181 } 182 183 @Override 184 protected void beforeWaitOnSafePoint() { 185 try { 186 assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)); 187 } catch (InterruptedException e) { 188 throw new RuntimeException(e); 189 } 190 } 191 192 public SyncFuture publishSyncOnRingBuffer() { 193 long sequence = getSequenceOnRingBuffer(); 194 return publishSyncOnRingBuffer(sequence, false); 195 } 196 } 197 198 final String name = this.name.getMethodName(); 199 try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 200 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 201 log.setWriter(new FailingWriter()); 202 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 203 ringBufferEventHandlerField.setAccessible(true); 204 FSHLog.RingBufferEventHandler ringBufferEventHandler = 205 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 206 // Force a safe point 207 FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint(); 208 try { 209 SyncFuture future0 = log.publishSyncOnRingBuffer(); 210 // Wait for the sync to be done. 211 Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone); 212 // Publish another sync from the same thread, this should not overwrite the done sync. 213 SyncFuture future1 = log.publishSyncOnRingBuffer(); 214 assertFalse(future1.isDone()); 215 // Unblock the safe point trigger.. 216 blockBeforeSafePoint.countDown(); 217 // Wait for the safe point to be reached. 218 // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync 219 // pipeline. 220 Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained); 221 } finally { 222 // Force release the safe point, for the clean up. 223 latch.releaseSafePoint(); 224 } 225 } 226 } 227 228 /** 229 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 230 */ 231 @Test 232 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 233 final String name = this.name.getMethodName(); 234 final byte[] b = Bytes.toBytes("b"); 235 236 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 237 final CountDownLatch holdAppend = new CountDownLatch(1); 238 final CountDownLatch flushFinished = new CountDownLatch(1); 239 final CountDownLatch putFinished = new CountDownLatch(1); 240 241 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 242 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 243 log.init(); 244 log.registerWALActionsListener(new WALActionsListener() { 245 @Override 246 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 247 if (startHoldingForAppend.get()) { 248 try { 249 holdAppend.await(); 250 } catch (InterruptedException e) { 251 LOG.error(e.toString(), e); 252 } 253 } 254 } 255 }); 256 257 // open a new region which uses this WAL 258 TableDescriptor htd = 259 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 260 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 261 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 262 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 263 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 264 final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); 265 ExecutorService exec = Executors.newFixedThreadPool(2); 266 267 // do a regular write first because of memstore size calculation. 268 region.put(new Put(b).addColumn(b, b, b)); 269 270 startHoldingForAppend.set(true); 271 exec.submit(new Runnable() { 272 @Override 273 public void run() { 274 try { 275 region.put(new Put(b).addColumn(b, b, b)); 276 putFinished.countDown(); 277 } catch (IOException e) { 278 LOG.error(e.toString(), e); 279 } 280 } 281 }); 282 283 // give the put a chance to start 284 Threads.sleep(3000); 285 286 exec.submit(new Runnable() { 287 @Override 288 public void run() { 289 try { 290 HRegion.FlushResult flushResult = region.flush(true); 291 LOG.info("Flush result:" + flushResult.getResult()); 292 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 293 flushFinished.countDown(); 294 } catch (IOException e) { 295 LOG.error(e.toString(), e); 296 } 297 } 298 }); 299 300 // give the flush a chance to start. Flush should have got the region lock, and 301 // should have been waiting on the mvcc complete after this. 302 Threads.sleep(3000); 303 304 // let the append to WAL go through now that the flush already started 305 holdAppend.countDown(); 306 putFinished.await(); 307 flushFinished.await(); 308 309 // check whether flush went through 310 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size()); 311 312 // now check the region's unflushed seqIds. 313 long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); 314 assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM, 315 seqId); 316 317 region.close(); 318 } 319 } 320}