001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.testclassification.ClientTests;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.junit.AfterClass;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({MediumTests.class, ClientTests.class})
042public class TestRegionLocationCaching {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestRegionLocationCaching.class);
047
048  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
049  private static int SLAVES = 1;
050  private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
051  private static TableName TABLE_NAME = TableName.valueOf("TestRegionLocationCaching");
052  private static byte[] FAMILY = Bytes.toBytes("testFamily");
053  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
054
055  @BeforeClass
056  public static void setUpBeforeClass() throws Exception {
057    TEST_UTIL.startMiniCluster(SLAVES);
058    TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY });
059    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
060  }
061
062  @AfterClass
063  public static void tearDownAfterClass() throws Exception {
064    TEST_UTIL.shutdownMiniCluster();
065  }
066
067  @Test
068  public void testCachingForHTableMultiplexerSinglePut() throws Exception {
069    HTableMultiplexer multiplexer =
070        new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE);
071    byte[] row = Bytes.toBytes("htable_multiplexer_single_put");
072    byte[] value = Bytes.toBytes("value");
073
074    Put put = new Put(row);
075    put.addColumn(FAMILY, QUALIFIER, value);
076    assertTrue("Put request not accepted by multiplexer queue", multiplexer.put(TABLE_NAME, put));
077
078    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
079    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
080
081    multiplexer.close();
082  }
083
084  @Test
085  public void testCachingForHTableMultiplexerMultiPut() throws Exception {
086    HTableMultiplexer multiplexer =
087        new HTableMultiplexer(TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE);
088
089    List<Put> multiput = new ArrayList<Put>();
090    for (int i = 0; i < 10; i++) {
091      Put put = new Put(Bytes.toBytes("htable_multiplexer_multi_put" + i));
092      byte[] value = Bytes.toBytes("value_" + i);
093      put.addColumn(FAMILY, QUALIFIER, value);
094      multiput.add(put);
095    }
096
097    List<Put> failedPuts = multiplexer.put(TABLE_NAME, multiput);
098    assertNull("All put requests were not accepted by multiplexer queue", failedPuts);
099
100    checkRegionLocationIsCached(TABLE_NAME, multiplexer.getConnection());
101    for (int i = 0; i < 10; i++) {
102      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multiplexer_multi_put" + i), FAMILY,
103        QUALIFIER);
104    }
105
106    multiplexer.close();
107  }
108
109  @Test
110  public void testCachingForHTableSinglePut() throws Exception {
111    byte[] row = Bytes.toBytes("htable_single_put");
112    byte[] value = Bytes.toBytes("value");
113
114    Put put = new Put(row);
115    put.addColumn(FAMILY, QUALIFIER, value);
116
117    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
118      table.put(put);
119    }
120
121    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
122    checkExistence(TABLE_NAME, row, FAMILY, QUALIFIER);
123  }
124
125  @Test
126  public void testCachingForHTableMultiPut() throws Exception {
127    List<Put> multiput = new ArrayList<Put>();
128    for (int i = 0; i < 10; i++) {
129      Put put = new Put(Bytes.toBytes("htable_multi_put" + i));
130      byte[] value = Bytes.toBytes("value_" + i);
131      put.addColumn(FAMILY, QUALIFIER, value);
132      multiput.add(put);
133    }
134
135    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
136      table.put(multiput);
137    }
138    checkRegionLocationIsCached(TABLE_NAME, TEST_UTIL.getConnection());
139    for (int i = 0; i < 10; i++) {
140      checkExistence(TABLE_NAME, Bytes.toBytes("htable_multi_put" + i), FAMILY, QUALIFIER);
141    }
142  }
143
144  /**
145   * Method to check whether the cached region location is non-empty for the given table. It repeats
146   * the same check several times as clearing of cache by some async operations may not reflect
147   * immediately.
148   */
149  private void checkRegionLocationIsCached(final TableName tableName, final Connection conn)
150      throws InterruptedException, IOException {
151    for (int count = 0; count < 50; count++) {
152      int number = ((ConnectionImplementation) conn).getNumberOfCachedRegionLocations(tableName);
153      assertNotEquals("Expected non-zero number of cached region locations", 0, number);
154      Thread.sleep(100);
155    }
156  }
157
158  /**
159   * Method to check whether the passed row exists in the given table
160   */
161  private static void checkExistence(final TableName tableName, final byte[] row,
162      final byte[] family, final byte[] qualifier) throws Exception {
163    // verify that the row exists
164    Result r;
165    Get get = new Get(row);
166    get.addColumn(family, qualifier);
167    int nbTry = 0;
168    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
169      do {
170        assertTrue("Failed to get row after " + nbTry + " tries", nbTry < 50);
171        nbTry++;
172        Thread.sleep(100);
173        r = table.get(get);
174      } while (r == null || r.getValue(family, qualifier) == null);
175    }
176  }
177}