001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionInfoBuilder; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.CommonFSUtils; 041import org.apache.hadoop.hbase.util.FSTableDescriptors; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.hadoop.hbase.wal.WAL; 044import org.apache.hadoop.hbase.wal.WALEdit; 045import org.apache.hadoop.hbase.wal.WALFactory; 046import org.apache.hadoop.hbase.wal.WALKeyImpl; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Test many concurrent appenders to an WAL while rolling the log. 055 */ 056@Category({RegionServerTests.class, MediumTests.class}) 057public class TestLogRollingNoCluster { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestLogRollingNoCluster.class); 062 063 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 064 private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; 065 private static final int NUM_THREADS = 100; // Spin up this many threads 066 private static final int NUM_ENTRIES = 100; // How many entries to write 067 068 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 069 public static class HighLatencySyncWriter extends ProtobufLogWriter { 070 @Override 071 public void sync(boolean forceSync) throws IOException { 072 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 073 super.sync(forceSync); 074 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 075 } 076 } 077 078 /** 079 * Spin up a bunch of threads and have them all append to a WAL. Roll the 080 * WAL frequently to try and trigger NPE. 081 * @throws IOException 082 * @throws InterruptedException 083 */ 084 @Test 085 public void testContendedLogRolling() throws Exception { 086 TEST_UTIL.startMiniDFSCluster(3); 087 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 088 089 // The implementation needs to know the 'handler' count. 090 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 091 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 092 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 093 CommonFSUtils.setRootDir(conf, dir); 094 FSTableDescriptors fsTableDescriptors = new FSTableDescriptors(TEST_UTIL.getConfiguration()); 095 FSTableDescriptors.tryUpdateMetaTableDescriptor(TEST_UTIL.getConfiguration()); 096 TableDescriptor metaTableDescriptor = fsTableDescriptors.get(TableName.META_TABLE_NAME); 097 conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); 098 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 099 final WAL wal = wals.getWAL(null); 100 101 Appender [] appenders = null; 102 103 final int numThreads = NUM_THREADS; 104 appenders = new Appender[numThreads]; 105 try { 106 for (int i = 0; i < numThreads; i++) { 107 // Have each appending thread write 'count' entries 108 appenders[i] = new Appender(metaTableDescriptor, wal, i, NUM_ENTRIES); 109 } 110 for (int i = 0; i < numThreads; i++) { 111 appenders[i].start(); 112 } 113 for (int i = 0; i < numThreads; i++) { 114 //ensure that all threads are joined before closing the wal 115 appenders[i].join(); 116 } 117 } finally { 118 wals.close(); 119 } 120 for (int i = 0; i < numThreads; i++) { 121 assertFalse("Error: " + appenders[i].getException(), appenders[i].isException()); 122 } 123 TEST_UTIL.shutdownMiniDFSCluster(); 124 } 125 126 /** 127 * Appender thread. Appends to passed wal file. 128 */ 129 static class Appender extends Thread { 130 private final Logger log; 131 private final WAL wal; 132 private final int count; 133 private Exception e = null; 134 private final TableDescriptor metaTableDescriptor; 135 136 Appender(TableDescriptor metaTableDescriptor, final WAL wal, final int index, final int count) { 137 super("" + index); 138 this.wal = wal; 139 this.count = count; 140 this.metaTableDescriptor = metaTableDescriptor; 141 this.log = LoggerFactory.getLogger("Appender:" + getName()); 142 } 143 144 /** 145 * @return Call when the thread is done. 146 */ 147 boolean isException() { 148 return !isAlive() && this.e != null; 149 } 150 151 Exception getException() { 152 return this.e; 153 } 154 155 @Override 156 public void run() { 157 this.log.info(getName() +" started"); 158 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 159 try { 160 for (int i = 0; i < this.count; i++) { 161 long now = System.currentTimeMillis(); 162 // Roll every ten edits 163 if (i % 10 == 0) { 164 this.wal.rollWriter(); 165 } 166 WALEdit edit = new WALEdit(); 167 byte[] bytes = Bytes.toBytes(i); 168 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 169 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 170 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 171 for(byte[] fam: this.metaTableDescriptor.getColumnFamilyNames()) { 172 scopes.put(fam, 0); 173 } 174 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 175 TableName.META_TABLE_NAME, now, mvcc, scopes), edit); 176 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 177 wal.sync(txid); 178 } 179 String msg = getName() + " finished"; 180 if (isException()) 181 this.log.info(msg, getException()); 182 else 183 this.log.info(msg); 184 } catch (Exception e) { 185 this.e = e; 186 log.info("Caught exception from Appender:" + getName(), e); 187 } finally { 188 // Call sync on our log.else threads just hang out. 189 try { 190 this.wal.sync(); 191 } catch (IOException e) { 192 throw new RuntimeException(e); 193 } 194 } 195 } 196 } 197 198 //@org.junit.Rule 199 //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 200 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 201}