001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.lang.reflect.Field; 024import java.util.List; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.concurrent.CountDownLatch; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.RegionInfoBuilder; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.ChunkCreator; 044import org.apache.hadoop.hbase.regionserver.HRegion; 045import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; 046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CommonFSUtils; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.hadoop.hbase.wal.WALEdit; 053import org.apache.hadoop.hbase.wal.WALKey; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060/** 061 * Provides FSHLog test cases. 062 */ 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestFSHLog extends AbstractTestFSWAL { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestFSHLog.class); 069 070 @Rule 071 public TestName name = new TestName(); 072 073 @Override 074 protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String walDir, String archiveDir, 075 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 076 String prefix, String suffix) throws IOException { 077 FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, 078 conf, listeners, failIfWALExists, prefix, suffix); 079 fshLog.init(); 080 return fshLog; 081 } 082 083 @Override 084 protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir, 085 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 086 boolean failIfWALExists, String prefix, String suffix, final Runnable action) 087 throws IOException { 088 FSHLog fshLog = new FSHLog(fs, rootDir, walDir, archiveDir, 089 conf, listeners, failIfWALExists, prefix, suffix) { 090 091 @Override 092 protected void atHeadOfRingBufferEventHandlerAppend() { 093 action.run(); 094 super.atHeadOfRingBufferEventHandlerAppend(); 095 } 096 }; 097 fshLog.init(); 098 return fshLog; 099 } 100 101 @Test 102 public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException, 103 SecurityException, IllegalArgumentException, IllegalAccessException { 104 final String name = this.name.getMethodName(); 105 FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 106 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 107 log.init(); 108 try { 109 Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); 110 ringBufferEventHandlerField.setAccessible(true); 111 FSHLog.RingBufferEventHandler ringBufferEventHandler = 112 (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); 113 Field syncRunnerIndexField = 114 FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex"); 115 syncRunnerIndexField.setAccessible(true); 116 syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); 117 TableDescriptor htd = 118 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 119 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 120 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 121 for (byte[] fam : htd.getColumnFamilyNames()) { 122 scopes.put(fam, 0); 123 } 124 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 125 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 126 for (int i = 0; i < 10; i++) { 127 addEdits(log, hri, htd, 1, mvcc, scopes); 128 } 129 } finally { 130 log.close(); 131 } 132 } 133 134 /** 135 * Test case for https://issues.apache.org/jira/browse/HBASE-16721 136 */ 137 @Test 138 public void testUnflushedSeqIdTracking() throws IOException, InterruptedException { 139 final String name = this.name.getMethodName(); 140 final byte[] b = Bytes.toBytes("b"); 141 142 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 143 final CountDownLatch holdAppend = new CountDownLatch(1); 144 final CountDownLatch flushFinished = new CountDownLatch(1); 145 final CountDownLatch putFinished = new CountDownLatch(1); 146 147 try (FSHLog log = new FSHLog(FS, CommonFSUtils.getRootDir(CONF), name, 148 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 149 log.init(); 150 log.registerWALActionsListener(new WALActionsListener() { 151 @Override 152 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) 153 throws IOException { 154 if (startHoldingForAppend.get()) { 155 try { 156 holdAppend.await(); 157 } catch (InterruptedException e) { 158 LOG.error(e.toString(), e); 159 } 160 } 161 } 162 }); 163 164 // open a new region which uses this WAL 165 TableDescriptor htd = 166 TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) 167 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 168 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 169 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 170 final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); 171 ExecutorService exec = Executors.newFixedThreadPool(2); 172 173 // do a regular write first because of memstore size calculation. 174 region.put(new Put(b).addColumn(b, b,b)); 175 176 startHoldingForAppend.set(true); 177 exec.submit(new Runnable() { 178 @Override 179 public void run() { 180 try { 181 region.put(new Put(b).addColumn(b, b,b)); 182 putFinished.countDown(); 183 } catch (IOException e) { 184 LOG.error(e.toString(), e); 185 } 186 } 187 }); 188 189 // give the put a chance to start 190 Threads.sleep(3000); 191 192 exec.submit(new Runnable() { 193 @Override 194 public void run() { 195 try { 196 HRegion.FlushResult flushResult = region.flush(true); 197 LOG.info("Flush result:" + flushResult.getResult()); 198 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 199 flushFinished.countDown(); 200 } catch (IOException e) { 201 LOG.error(e.toString(), e); 202 } 203 } 204 }); 205 206 // give the flush a chance to start. Flush should have got the region lock, and 207 // should have been waiting on the mvcc complete after this. 208 Threads.sleep(3000); 209 210 // let the append to WAL go through now that the flush already started 211 holdAppend.countDown(); 212 putFinished.await(); 213 flushFinished.await(); 214 215 // check whether flush went through 216 assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); 217 218 // now check the region's unflushed seqIds. 219 long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); 220 assertEquals("Found seqId for the region which is already flushed", 221 HConstants.NO_SEQNUM, seqId); 222 223 region.close(); 224 } 225 } 226}