001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.ChunkCreator; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.FSUtils; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALKey; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060/** 061 * Provides FSHLog test cases. 062 */ 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestFSHLog extends AbstractTestFSWAL { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestFSHLog.class); 069 070 @Rule 071 public TestName name = new TestName(); 072 073 @Override 074 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 075 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 076 String prefix, String suffix) throws IOException { 077 return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, 078 suffix); 079 } 080 081 @Override 082 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 083 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 084 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 085 throws IOException { 086 return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, failIfWALExists, prefix, 087 suffix) { 088 089 @Override 090 protected void atHeadOfRingBufferEventHandlerAppend() { 091 action.run(); 092 super.atHeadOfRingBufferEventHandlerAppend(); 093 } 094 }; 095 } 096 097 @Test 098 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 099 SecurityException, IllegalArgumentException, IllegalAccessException { 100 final String name = this.name.getMethodName(); 101 FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, 102 CONF, null, true, null, null); 103 try { 104 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 105 ringBufferEventHandlerField.setAccessible(true); 106 FSHLog.RingBufferEventHandler ringBufferEventHandler = 107 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 108 Field syncRunnerIndexField = 109 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex"); 110 syncRunnerIndexField.setAccessible(true); 111 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); 112 TableDescriptor htd = 113 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 114 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 115 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 116 for (byte[] fam : htd.getColumnFamilyNames()) { 117 scopes.put(fam, 0); 118 } 119 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 120 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 121 for (int i = 0; i < 10; i++) { 122 addEdits(log, hri, htd, 1, mvcc, scopes); 123 } 124 } finally { 125 log.close(); 126 } 127 } 128 129 /** 130 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 131 */ 132 @Test 133 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 134 final String name = this.name.getMethodName(); 135 final byte[] b = Bytes.toBytes("b"); 136 137 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 138 final CountDownLatch holdAppend = new CountDownLatch(1); 139 final CountDownLatch flushFinished = new CountDownLatch(1); 140 final CountDownLatch putFinished = new CountDownLatch(1); 141 142 try (FSHLog log = 143 new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, 144 null, true, null, null)) { 145 146 log.registerWALActionsListener(new WALActionsListener() { 147 @Override 148 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) 149 throws IOException { 150 if (startHoldingForAppend.get()) { 151 try { 152 holdAppend.await(); 153 } catch (InterruptedException e) { 154 LOG.error(e.toString(), e); 155 } 156 } 157 } 158 }); 159 160 // open a new region which uses this WAL 161 TableDescriptor htd = 162 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 163 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 164 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 165 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 166 final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); 167 ExecutorService exec = Executors.newFixedThreadPool(2); 168 169 // do a regular write first because of memstore size calculation. 170 region.put(new Put(b).addColumn(b, b,b)); 171 172 startHoldingForAppend.set(true); 173 exec.submit(new Runnable() { 174 @Override 175 public void run() { 176 try { 177 region.put(new Put(b).addColumn(b, b,b)); 178 putFinished.countDown(); 179 } catch (IOException e) { 180 LOG.error(e.toString(), e); 181 } 182 } 183 }); 184 185 // give the put a chance to start 186 Threads.sleep(3000); 187 188 exec.submit(new Runnable() { 189 @Override 190 public void run() { 191 try { 192 HRegion.FlushResult flushResult = region.flush(true); 193 LOG.info("Flush result:" + flushResult.getResult()); 194 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 195 flushFinished.countDown(); 196 } catch (IOException e) { 197 LOG.error(e.toString(), e); 198 } 199 } 200 }); 201 202 // give the flush a chance to start. Flush should have got the region lock, and 203 // should have been waiting on the mvcc complete after this. 204 Threads.sleep(3000); 205 206 // let the append to WAL go through now that the flush already started 207 holdAppend.countDown(); 208 putFinished.await(); 209 flushFinished.await(); 210 211 // check whether flush went through 212 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); 213 214 // now check the region's unflushed seqIds. 215 long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); 216 assertEquals("Found seqId for the region which is already flushed", 217 HConstants.NO_SEQNUM, seqId); 218 219 region.close(); 220 } 221 } 222}