001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertNotEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HRegionLocation;
031import org.apache.hadoop.hbase.MetaTableAccessor;
032import org.apache.hadoop.hbase.RegionLocations;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Rule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.junit.rules.TestName;
045
046import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
047
048@Category({ MediumTests.class, ClientTests.class })
049public class TestRegionLocationCaching {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestRegionLocationCaching.class);
054
055  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
056  private static int SLAVES = 1;
057  private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
058  private static TableName TABLE_NAME = TableName.valueOf("TestRegionLocationCaching");
059  private static byte[] FAMILY = Bytes.toBytes("testFamily");
060  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
061
062  @Rule
063  public final TestName name = new TestName();
064
065  @BeforeClass
066  public static void setUpBeforeClass() throws Exception {
067    TEST_UTIL.startMiniCluster(SLAVES);
068    TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY });
069    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
070  }
071
072  @AfterClass
073  public static void tearDownAfterClass() throws Exception {
074    TEST_UTIL.shutdownMiniCluster();
075  }
076
077  @Test
078  public void testDoNotCacheLocationWithNullServerNameWhenGetAllLocations() throws Exception {
079    TableName tableName = TableName.valueOf(name.getMethodName());
080    TEST_UTIL.createTable(tableName, new byte[][] { FAMILY });
081    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
082
083    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
084    List<RegionInfo> regions = TEST_UTIL.getAdmin().getRegions(tableName);
085    RegionInfo chosen = regions.get(0);
086
087    // re-populate region cache
088    RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(tableName);
089    regionLocator.clearRegionLocationCache();
090    regionLocator.getAllRegionLocations();
091
092    // expect all to be non-null at first
093    checkRegions(tableName, conn, regions, null);
094
095    // clear servername from region info
096    Put put = MetaTableAccessor.makePutFromRegionInfo(chosen, EnvironmentEdgeManager.currentTime());
097    MetaTableAccessor.addEmptyLocation(put, 0);
098    MetaTableAccessor.putsToMetaTable(TEST_UTIL.getConnection(), Lists.newArrayList(put));
099
100    // re-populate region cache again. check that we succeeded in nulling the servername
101    regionLocator.clearRegionLocationCache();
102    for (HRegionLocation loc : regionLocator.getAllRegionLocations()) {
103      if (loc.getRegion().equals(chosen)) {
104        assertNull(loc.getServerName());
105      }
106    }
107
108    // expect all but chosen to be non-null. chosen should be null because serverName was null
109    checkRegions(tableName, conn, regions, chosen);
110  }
111
112  private void checkRegions(TableName tableName, ConnectionImplementation conn,
113    List<RegionInfo> regions, RegionInfo chosen) {
114    for (RegionInfo region : regions) {
115      RegionLocations fromCache = conn.getCachedLocation(tableName, region.getStartKey());
116      if (region.equals(chosen)) {
117        assertNull(fromCache);
118      } else {
119        assertNotNull(fromCache);
120      }
121    }
122  }
123
124  @Test
125  public void testCachingForHTableMultiplexerSinglePut() throws Exception {
126    HTableMultiplexer multiplexer =
127      new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE);
128    byte[] row = Bytes.toBytes("htable_multiplexer_single_put");
129    byte[] value = Bytes.toBytes("value");
130
131    Put put = new Put(row);
132    put.addColumn(FAMILY, QUALIFIER, value);
133    assertTrue("Put request not accepted by multiplexer queue", multiplexer.put(TABLE_NAME, put));
134
135    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
136    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
137
138    multiplexer.close();
139  }
140
141  @Test
142  public void testCachingForHTableMultiplexerMultiPut() throws Exception {
143    HTableMultiplexer multiplexer =
144      new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE);
145
146    List<Put> multiput = new ArrayList<Put>();
147    for (int i = 0; i < 10; i++) {
148      Put put = new Put(Bytes.toBytes("htable_multiplexer_multi_put" + i));
149      byte[] value = Bytes.toBytes("value_" + i);
150      put.addColumn(FAMILY, QUALIFIER, value);
151      multiput.add(put);
152    }
153
154    List<Put> failedPuts = multiplexer.put(TABLE_NAME, multiput);
155    assertNull("All put requests were not accepted by multiplexer queue", failedPuts);
156
157    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
158    for (int i = 0; i < 10; i++) {
159      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multiplexer_multi_put" + i), FAMILY,
160        QUALIFIER);
161    }
162
163    multiplexer.close();
164  }
165
166  @Test
167  public void testCachingForHTableSinglePut() throws Exception {
168    byte[] row = Bytes.toBytes("htable_single_put");
169    byte[] value = Bytes.toBytes("value");
170
171    Put put = new Put(row);
172    put.addColumn(FAMILY, QUALIFIER, value);
173
174    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
175      table.put(put);
176    }
177
178    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
179    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
180  }
181
182  @Test
183  public void testCachingForHTableMultiPut() throws Exception {
184    List<Put> multiput = new ArrayList<Put>();
185    for (int i = 0; i < 10; i++) {
186      Put put = new Put(Bytes.toBytes("htable_multi_put" + i));
187      byte[] value = Bytes.toBytes("value_" + i);
188      put.addColumn(FAMILY, QUALIFIER, value);
189      multiput.add(put);
190    }
191
192    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
193      table.put(multiput);
194    }
195    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
196    for (int i = 0; i < 10; i++) {
197      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multi_put" + i), FAMILY, QUALIFIER);
198    }
199  }
200
201  /**
202   * Method to check whether the cached region location is non-empty for the given table. It repeats
203   * the same check several times as clearing of cache by some async operations may not reflect
204   * immediately.
205   */
206  private void checkRegionLocationIsCached(final TableName tableName, final Connection conn)
207    throws InterruptedException, IOException {
208    for (int count = 0; count < 50; count++) {
209      int number = ((ConnectionImplementation) conn).getNumberOfCachedRegionLocations(tableName);
210      assertNotEquals("Expected non-zero number of cached region locations", 0, number);
211      Thread.sleep(100);
212    }
213  }
214
215  /**
216   * Method to check whether the passed row exists in the given table
217   */
218  private static void checkExistence(final TableName tableName, final byte[] row,
219    final byte[] family, final byte[] qualifier) throws Exception {
220    // verify that the row exists
221    Result r;
222    Get get = new Get(row);
223    get.addColumn(family, qualifier);
224    int nbTry = 0;
225    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
226      do {
227        assertTrue("Failed to get row after " + nbTry + " tries", nbTry < 50);
228        nbTry++;
229        Thread.sleep(100);
230        r = table.get(get);
231      } while (r == null || r.getValue(family, qualifier) == null);
232    }
233  }
234}