001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.HBaseTestingUtility; 025import org.apache.hadoop.hbase.HRegionLocation; 026import org.apache.hadoop.hbase.MiniHBaseCluster; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.testclassification.ClientTests; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.Pair; 035import org.junit.AfterClass; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Rule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.rules.TestName; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category({ LargeTests.class, ClientTests.class }) 046public class TestHTableMultiplexerFlushCache { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestHTableMultiplexerFlushCache.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestHTableMultiplexerFlushCache.class); 053 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 054 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 055 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1"); 056 private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2"); 057 private static byte[] VALUE1 = Bytes.toBytes("testValue1"); 058 private static byte[] VALUE2 = Bytes.toBytes("testValue2"); 059 private static int SLAVES = 3; 060 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000; 061 062 @Rule 063 public TestName name = new TestName(); 064 065 /** 066 * @throws java.lang.Exception 067 */ 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.startMiniCluster(SLAVES); 071 } 072 073 /** 074 * @throws java.lang.Exception 075 */ 076 @AfterClass 077 public static void tearDownAfterClass() throws Exception { 078 TEST_UTIL.shutdownMiniCluster(); 079 } 080 081 private static void checkExistence(final Table htable, final byte[] row, final byte[] family, 082 final byte[] quality, 083 final byte[] value) throws Exception { 084 // verify that the Get returns the correct result 085 TEST_UTIL.waitFor(30000, new Waiter.Predicate<Exception>() { 086 @Override 087 public boolean evaluate() throws Exception { 088 Result r; 089 Get get = new Get(row); 090 get.addColumn(family, quality); 091 r = htable.get(get); 092 return r != null && r.getValue(family, quality) != null 093 && Bytes.toStringBinary(value).equals( 094 Bytes.toStringBinary(r.getValue(family, quality))); 095 } 096 }); 097 } 098 099 @Test 100 public void testOnRegionChange() throws Exception { 101 final TableName tableName = TableName.valueOf(name.getMethodName()); 102 final int NUM_REGIONS = 10; 103 Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, 104 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); 105 106 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 107 PER_REGIONSERVER_QUEUE_SIZE); 108 109 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 110 byte[][] startRows = r.getStartKeys(); 111 byte[] row = startRows[1]; 112 assertTrue("2nd region should not start with empty row", row != null && row.length > 0); 113 114 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); 115 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 116 117 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); 118 119 // Now let's shutdown the regionserver and let regions moved to other servers. 120 HRegionLocation loc = r.getRegionLocation(row); 121 MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 122 hbaseCluster.stopRegionServer(loc.getServerName()); 123 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 124 125 // put with multiplexer. 126 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); 127 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 128 129 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); 130 } 131 } 132 133 @Test 134 public void testOnRegionMove() throws Exception { 135 // This test is doing near exactly the same thing that testOnRegionChange but avoiding the 136 // potential to get a ConnectionClosingException. By moving the region, we can be certain that 137 // the connection is still valid and that the implementation is correctly handling an invalid 138 // Region cache (and not just tearing down the entire connection). 139 final TableName tableName = TableName.valueOf(name.getMethodName()); 140 final int NUM_REGIONS = 10; 141 Table htable = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 3, 142 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS); 143 144 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 145 PER_REGIONSERVER_QUEUE_SIZE); 146 147 final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(tableName); 148 Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys(); 149 byte[] row = startEndRows.getFirst()[1]; 150 assertTrue("2nd region should not start with empty row", row != null && row.length > 0); 151 152 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1); 153 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 154 155 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1); 156 157 final HRegionLocation loc = regionLocator.getRegionLocation(row); 158 final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 159 // The current server for the region we're writing to 160 final ServerName originalServer = loc.getServerName(); 161 ServerName newServer = null; 162 // Find a new server to move that region to 163 for (int i = 0; i < SLAVES; i++) { 164 HRegionServer rs = hbaseCluster.getRegionServer(i); 165 if (!rs.getServerName().equals(originalServer.getServerName())) { 166 newServer = rs.getServerName(); 167 break; 168 } 169 } 170 assertNotNull("Did not find a new RegionServer to use", newServer); 171 172 // Move the region 173 LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer 174 + " to " + newServer); 175 TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), 176 Bytes.toBytes(newServer.getServerName())); 177 178 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 179 180 // Send a new Put 181 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2); 182 assertTrue("multiplexer.put returns", multiplexer.put(tableName, put)); 183 184 // We should see the update make it to the new server eventually 185 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2); 186 } 187}