001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import java.io.IOException; 024import java.util.Random; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HColumnDescriptor; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HConstants.OperationStatusCode; 032import org.apache.hadoop.hbase.HRegionInfo; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 042import org.junit.After; 043import org.junit.Before; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Rule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.junit.rules.TestName; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Testing of multiPut in parallel. 055 * 056 */ 057@Category({RegionServerTests.class, MediumTests.class}) 058public class TestParallelPut { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestParallelPut.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestParallelPut.class); 065 @Rule public TestName name = new TestName(); 066 067 private HRegion region = null; 068 private static HBaseTestingUtility HBTU = new HBaseTestingUtility(); 069 private static final int THREADS100 = 100; 070 071 // Test names 072 static byte[] tableName; 073 static final byte[] qual1 = Bytes.toBytes("qual1"); 074 static final byte[] qual2 = Bytes.toBytes("qual2"); 075 static final byte[] qual3 = Bytes.toBytes("qual3"); 076 static final byte[] value1 = Bytes.toBytes("value1"); 077 static final byte[] value2 = Bytes.toBytes("value2"); 078 static final byte [] row = Bytes.toBytes("rowA"); 079 static final byte [] row2 = Bytes.toBytes("rowB"); 080 081 @BeforeClass 082 public static void beforeClass() { 083 // Make sure enough handlers. 084 HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100); 085 } 086 087 088 /** 089 * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() 090 */ 091 @Before 092 public void setUp() throws Exception { 093 tableName = Bytes.toBytes(name.getMethodName()); 094 } 095 096 @After 097 public void tearDown() throws Exception { 098 EnvironmentEdgeManagerTestHelper.reset(); 099 if (region != null) { 100 region.close(true); 101 } 102 } 103 104 public String getName() { 105 return name.getMethodName(); 106 } 107 108 ////////////////////////////////////////////////////////////////////////////// 109 // New tests that don't spin up a mini cluster but rather just test the 110 // individual code pieces in the HRegion. 111 ////////////////////////////////////////////////////////////////////////////// 112 113 /** 114 * Test one put command. 115 */ 116 @Test 117 public void testPut() throws IOException { 118 LOG.info("Starting testPut"); 119 this.region = initHRegion(tableName, getName(), fam1); 120 121 long value = 1L; 122 123 Put put = new Put(row); 124 put.addColumn(fam1, qual1, Bytes.toBytes(value)); 125 region.put(put); 126 127 assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value)); 128 } 129 130 /** 131 * Test multi-threaded Puts. 132 */ 133 @Test 134 public void testParallelPuts() throws IOException { 135 136 LOG.info("Starting testParallelPuts"); 137 138 this.region = initHRegion(tableName, getName(), fam1); 139 int numOps = 1000; // these many operations per thread 140 141 // create 100 threads, each will do its own puts 142 Putter[] all = new Putter[THREADS100]; 143 144 // create all threads 145 for (int i = 0; i < THREADS100; i++) { 146 all[i] = new Putter(region, i, numOps); 147 } 148 149 // run all threads 150 for (int i = 0; i < THREADS100; i++) { 151 all[i].start(); 152 } 153 154 // wait for all threads to finish 155 for (int i = 0; i < THREADS100; i++) { 156 try { 157 all[i].join(); 158 } catch (InterruptedException e) { 159 LOG.warn("testParallelPuts encountered InterruptedException." + 160 " Ignoring....", e); 161 } 162 } 163 LOG.info("testParallelPuts successfully verified " + 164 (numOps * THREADS100) + " put operations."); 165 } 166 167 168 private static void assertGet(final HRegion region, byte [] row, byte [] familiy, 169 byte[] qualifier, byte[] value) throws IOException { 170 // run a get and see if the value matches 171 Get get = new Get(row); 172 get.addColumn(familiy, qualifier); 173 Result result = region.get(get); 174 assertEquals(1, result.size()); 175 176 Cell kv = result.rawCells()[0]; 177 byte[] r = CellUtil.cloneValue(kv); 178 assertTrue(Bytes.compareTo(r, value) == 0); 179 } 180 181 private HRegion initHRegion(byte [] tableName, String callingMethod, byte[] ... families) 182 throws IOException { 183 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); 184 for(byte [] family : families) { 185 htd.addFamily(new HColumnDescriptor(family)); 186 } 187 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); 188 return HBTU.createLocalHRegion(info, htd); 189 } 190 191 /** 192 * A thread that makes a few put calls 193 */ 194 public static class Putter extends Thread { 195 196 private final HRegion region; 197 private final int threadNumber; 198 private final int numOps; 199 private final Random rand = new Random(); 200 byte [] rowkey = null; 201 202 public Putter(HRegion region, int threadNumber, int numOps) { 203 this.region = region; 204 this.threadNumber = threadNumber; 205 this.numOps = numOps; 206 this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread 207 setDaemon(true); 208 } 209 210 @Override 211 public void run() { 212 byte[] value = new byte[100]; 213 Put[] in = new Put[1]; 214 215 // iterate for the specified number of operations 216 for (int i=0; i<numOps; i++) { 217 // generate random bytes 218 rand.nextBytes(value); 219 220 // put the randombytes and verify that we can read it. This is one 221 // way of ensuring that rwcc manipulation in HRegion.put() is fine. 222 Put put = new Put(rowkey); 223 put.addColumn(fam1, qual1, value); 224 in[0] = put; 225 try { 226 OperationStatus[] ret = region.batchMutate(in); 227 assertEquals(1, ret.length); 228 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); 229 assertGet(this.region, rowkey, fam1, qual1, value); 230 } catch (IOException e) { 231 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.", 232 false); 233 } 234 } 235 } 236 } 237}