001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertFalse; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import java.util.concurrent.ThreadLocalRandom; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionInfoBuilder; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.FSUtils; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.hadoop.hbase.wal.WAL; 043import org.apache.hadoop.hbase.wal.WALEdit; 044import org.apache.hadoop.hbase.wal.WALFactory; 045import org.apache.hadoop.hbase.wal.WALKeyImpl; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Test many concurrent appenders to an WAL while rolling the log. 054 */ 055@Category({RegionServerTests.class, MediumTests.class}) 056public class TestLogRollingNoCluster { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestLogRollingNoCluster.class); 061 062 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; 064 private static final int NUM_THREADS = 100; // Spin up this many threads 065 private static final int NUM_ENTRIES = 100; // How many entries to write 066 067 /** ProtobufLogWriter that simulates higher latencies in sync() call */ 068 public static class HighLatencySyncWriter extends ProtobufLogWriter { 069 @Override 070 public void sync() throws IOException { 071 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 072 super.sync(); 073 Threads.sleep(ThreadLocalRandom.current().nextInt(10)); 074 } 075 } 076 077 /** 078 * Spin up a bunch of threads and have them all append to a WAL. Roll the 079 * WAL frequently to try and trigger NPE. 080 * @throws IOException 081 * @throws InterruptedException 082 */ 083 @Test 084 public void testContendedLogRolling() throws Exception { 085 TEST_UTIL.startMiniDFSCluster(3); 086 Path dir = TEST_UTIL.getDataTestDirOnTestFS(); 087 088 // The implementation needs to know the 'handler' count. 089 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); 090 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 091 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 092 FSUtils.setRootDir(conf, dir); 093 conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); 094 final WALFactory wals = new WALFactory(conf, TestLogRollingNoCluster.class.getName()); 095 final WAL wal = wals.getWAL(null); 096 097 Appender [] appenders = null; 098 099 final int numThreads = NUM_THREADS; 100 appenders = new Appender[numThreads]; 101 try { 102 for (int i = 0; i < numThreads; i++) { 103 // Have each appending thread write 'count' entries 104 appenders[i] = new Appender(wal, i, NUM_ENTRIES); 105 } 106 for (int i = 0; i < numThreads; i++) { 107 appenders[i].start(); 108 } 109 for (int i = 0; i < numThreads; i++) { 110 //ensure that all threads are joined before closing the wal 111 appenders[i].join(); 112 } 113 } finally { 114 wals.close(); 115 } 116 for (int i = 0; i < numThreads; i++) { 117 assertFalse(appenders[i].isException()); 118 } 119 TEST_UTIL.shutdownMiniDFSCluster(); 120 } 121 122 /** 123 * Appender thread. Appends to passed wal file. 124 */ 125 static class Appender extends Thread { 126 private final Logger log; 127 private final WAL wal; 128 private final int count; 129 private Exception e = null; 130 131 Appender(final WAL wal, final int index, final int count) { 132 super("" + index); 133 this.wal = wal; 134 this.count = count; 135 this.log = LoggerFactory.getLogger("Appender:" + getName()); 136 } 137 138 /** 139 * @return Call when the thread is done. 140 */ 141 boolean isException() { 142 return !isAlive() && this.e != null; 143 } 144 145 Exception getException() { 146 return this.e; 147 } 148 149 @Override 150 public void run() { 151 this.log.info(getName() +" started"); 152 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 153 try { 154 for (int i = 0; i < this.count; i++) { 155 long now = System.currentTimeMillis(); 156 // Roll every ten edits 157 if (i % 10 == 0) { 158 this.wal.rollWriter(); 159 } 160 WALEdit edit = new WALEdit(); 161 byte[] bytes = Bytes.toBytes(i); 162 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); 163 RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO; 164 TableDescriptor htd = TEST_UTIL.getMetaTableDescriptorBuilder().build(); 165 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 166 for(byte[] fam : htd.getColumnFamilyNames()) { 167 scopes.put(fam, 0); 168 } 169 final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), 170 TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); 171 Threads.sleep(ThreadLocalRandom.current().nextInt(5)); 172 wal.sync(txid); 173 } 174 String msg = getName() + " finished"; 175 if (isException()) 176 this.log.info(msg, getException()); 177 else 178 this.log.info(msg); 179 } catch (Exception e) { 180 this.e = e; 181 log.info("Caught exception from Appender:" + getName(), e); 182 } finally { 183 // Call sync on our log.else threads just hang out. 184 try { 185 this.wal.sync(); 186 } catch (IOException e) { 187 throw new RuntimeException(e); 188 } 189 } 190 } 191 } 192 193 //@org.junit.Rule 194 //public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = 195 // new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); 196}