001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.Random; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.HTableDescriptor; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Testing of multiPut in parallel. 056 * 057 */ 058@Category({RegionServerTests.class, MediumTests.class}) 059public class TestParallelPut { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestParallelPut.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class); 066 @Rule public TestName name = new TestName(); 067 068 private HRegion region = null; 069 private static HBaseTestingUtility HBTU = new HBaseTestingUtility(); 070 private static final int THREADS100 = 100; 071 072 // Test names 073 static byte[] tableName; 074 static final byte[] qual1 = Bytes.toBytes("qual1"); 075 static final byte[] qual2 = Bytes.toBytes("qual2"); 076 static final byte[] qual3 = Bytes.toBytes("qual3"); 077 static final byte[] value1 = Bytes.toBytes("value1"); 078 static final byte[] value2 = Bytes.toBytes("value2"); 079 static final byte [] row = Bytes.toBytes("rowA"); 080 static final byte [] row2 = Bytes.toBytes("rowB"); 081 082 @BeforeClass 083 public static void beforeClass() { 084 // Make sure enough handlers. 085 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100); 086 } 087 088 089 /** 090 * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() 091 */ 092 @Before 093 public void setUp() throws Exception { 094 tableName = Bytes.toBytes(name.getMethodName()); 095 } 096 097 @After 098 public void tearDown() throws Exception { 099 EnvironmentEdgeManagerTestHelper.reset(); 100 if (region != null) region.close(true); 101 } 102 103 public String getName() { 104 return name.getMethodName(); 105 } 106 107 ////////////////////////////////////////////////////////////////////////////// 108 // New tests that don't spin up a mini cluster but rather just test the 109 // individual code pieces in the HRegion. 110 ////////////////////////////////////////////////////////////////////////////// 111 112 /** 113 * Test one put command. 114 */ 115 @Test 116 public void testPut() throws IOException { 117 LOG.info("Starting testPut"); 118 this.region = initHRegion(tableName, getName(), fam1); 119 120 long value = 1L; 121 122 Put put = new Put(row); 123 put.addColumn(fam1, qual1, Bytes.toBytes(value)); 124 region.put(put); 125 126 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value)); 127 } 128 129 /** 130 * Test multi-threaded Puts. 131 */ 132 @Test 133 public void testParallelPuts() throws IOException { 134 135 LOG.info("Starting testParallelPuts"); 136 137 this.region = initHRegion(tableName, getName(), fam1); 138 int numOps = 1000; // these many operations per thread 139 140 // create 100 threads, each will do its own puts 141 Putter[] all = new Putter[THREADS100]; 142 143 // create all threads 144 for (int i = 0; i < THREADS100; i++) { 145 all[i] = new Putter(region, i, numOps); 146 } 147 148 // run all threads 149 for (int i = 0; i < THREADS100; i++) { 150 all[i].start(); 151 } 152 153 // wait for all threads to finish 154 for (int i = 0; i < THREADS100; i++) { 155 try { 156 all[i].join(); 157 } catch (InterruptedException e) { 158 LOG.warn("testParallelPuts encountered InterruptedException." + 159 " Ignoring....", e); 160 } 161 } 162 LOG.info("testParallelPuts successfully verified " + 163 (numOps * THREADS100) + " put operations."); 164 } 165 166 167 private static void assertGet(final HRegion region, byte [] row, byte [] familiy, 168 byte[] qualifier, byte[] value) throws IOException { 169 // run a get and see if the value matches 170 Get get = new Get(row); 171 get.addColumn(familiy, qualifier); 172 Result result = region.get(get); 173 assertEquals(1, result.size()); 174 175 Cell kv = result.rawCells()[0]; 176 byte[] r = CellUtil.cloneValue(kv); 177 assertTrue(Bytes.compareTo(r, value) == 0); 178 } 179 180 private HRegion initHRegion(byte [] tableName, String callingMethod, 181 byte[] ... families) 182 throws IOException { 183 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 184 for(byte [] family : families) { 185 htd.addFamily(new HColumnDescriptor(family)); 186 } 187 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 188 return HBTU.createLocalHRegion(info, htd); 189 } 190 191 /** 192 * A thread that makes a few put calls 193 */ 194 public static class Putter extends Thread { 195 196 private final HRegion region; 197 private final int threadNumber; 198 private final int numOps; 199 private final Random rand = new Random(); 200 byte [] rowkey = null; 201 202 public Putter(HRegion region, int threadNumber, int numOps) { 203 this.region = region; 204 this.threadNumber = threadNumber; 205 this.numOps = numOps; 206 this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread 207 setDaemon(true); 208 } 209 210 @Override 211 public void run() { 212 byte[] value = new byte[100]; 213 Put[] in = new Put[1]; 214 215 // iterate for the specified number of operations 216 for (int i=0; i<numOps; i++) { 217 // generate random bytes 218 rand.nextBytes(value); 219 220 // put the randombytes and verify that we can read it. This is one 221 // way of ensuring that rwcc manipulation in HRegion.put() is fine. 222 Put put = new Put(rowkey); 223 put.addColumn(fam1, qual1, value); 224 in[0] = put; 225 try { 226 OperationStatus[] ret = region.batchMutate(in); 227 assertEquals(1, ret.length); 228 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); 229 assertGet(this.region, rowkey, fam1, qual1, value); 230 } catch (IOException e) { 231 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.", 232 false); 233 } 234 } 235 } 236 } 237}