001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.HBaseTestingUtility; 025import org.apache.hadoop.hbase.HRegionLocation; 026import org.apache.hadoop.hbase.MiniHBaseCluster; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.Pair; 035import org.junit.AfterClass; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Rule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.rules.TestName; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category({ LargeTests.class, ClientTests.class }) 046public class TestHTableMultiplexerFlushCache { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestHTableMultiplexerFlushCache.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestHTableMultiplexerFlushCache.class); 053 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 054 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 055 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1"); 056 private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2"); 057 private static byte[] VALUE1 = Bytes.toBytes("testValue1"); 058 private static byte[] VALUE2 = Bytes.toBytes("testValue2"); 059 private static int SLAVES = 3; 060 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; 061 062 @Rule 063 public TestName name = new TestName(); 064 065 /** 066 * @throws java.lang.Exception 067 */ 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.startMiniCluster(SLAVES); 071 } 072 073 /** 074 * @throws java.lang.Exception 075 */ 076 @AfterClass 077 public static void tearDownAfterClass() throws Exception { 078 TEST_UTIL.shutdownMiniCluster(); 079 } 080 081 private static void checkExistence(final Table htable, final byte[] row, final byte[] family, 082 final byte[] quality, final byte[] value) throws Exception { 083 // verify that the Get returns the correct result 084 TEST_UTIL.waitFor(30000, new Waiter.Predicate<Exception>() { 085 @Override 086 public boolean evaluate() throws Exception { 087 Result r; 088 Get get = new Get(row); 089 get.addColumn(family, quality); 090 r = htable.get(get); 091 return r != null && r.getValue(family, quality) != null 092 && Bytes.toStringBinary(value).equals(Bytes.toStringBinary(r.getValue(family, quality))); 093 } 094 }); 095 } 096 097 @Test 098 public void testOnRegionChange() throws Exception { 099 final TableName tableName = TableName.valueOf(name.getMethodName()); 100 final int NUM_REGIONS = 10; 101 Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, 102 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); 103 104 HTableMultiplexer multiplexer = 105 new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); 106 107 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 108 byte[][] startRows = r.getStartKeys(); 109 byte[] row = startRows[1]; 110 assertTrue("2nd region should not start with empty row", row != null && row.length > 0); 111 112 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); 113 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 114 115 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); 116 117 // Now let's shutdown the regionserver and let regions moved to other servers. 118 HRegionLocation loc = r.getRegionLocation(row); 119 MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 120 hbaseCluster.stopRegionServer(loc.getServerName()); 121 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 122 123 // put with multiplexer. 124 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); 125 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 126 127 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); 128 } 129 } 130 131 @Test 132 public void testOnRegionMove() throws Exception { 133 // This test is doing near exactly the same thing that testOnRegionChange but avoiding the 134 // potential to get a ConnectionClosingException. By moving the region, we can be certain that 135 // the connection is still valid and that the implementation is correctly handling an invalid 136 // Region cache (and not just tearing down the entire connection). 137 final TableName tableName = TableName.valueOf(name.getMethodName()); 138 final int NUM_REGIONS = 10; 139 Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, 140 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); 141 142 HTableMultiplexer multiplexer = 143 new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE); 144 145 final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(tableName); 146 Pair<byte[][], byte[][]> startEndRows = regionLocator.getStartEndKeys(); 147 byte[] row = startEndRows.getFirst()[1]; 148 assertTrue("2nd region should not start with empty row", row != null && row.length > 0); 149 150 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); 151 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 152 153 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); 154 155 final HRegionLocation loc = regionLocator.getRegionLocation(row); 156 final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 157 // The current server for the region we're writing to 158 final ServerName originalServer = loc.getServerName(); 159 ServerName newServer = null; 160 // Find a new server to move that region to 161 for (int i = 0; i < SLAVES; i++) { 162 HRegionServer rs = hbaseCluster.getRegionServer(i); 163 if (!rs.getServerName().equals(originalServer.getServerName())) { 164 newServer = rs.getServerName(); 165 break; 166 } 167 } 168 assertNotNull("Did not find a new RegionServer to use", newServer); 169 170 // Move the region 171 LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer + " to " 172 + newServer); 173 TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), 174 Bytes.toBytes(newServer.getServerName())); 175 176 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 177 178 // Send a new Put 179 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); 180 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 181 182 // We should see the update make it to the new server eventually 183 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); 184 } 185}