001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.security.InvalidParameterException; 022import java.util.Map; 023import java.util.Set; 024import java.util.TreeMap; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.filter.Filter; 034import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 035import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 036import org.apache.hadoop.hbase.testclassification.IntegrationTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 040import org.apache.hadoop.hbase.util.MultiThreadedWriter; 041import org.apache.hadoop.hbase.util.RegionSplitter; 042import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 043import org.junit.After; 044import org.junit.Assert; 045import org.junit.Before; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Integration test that verifies lazy CF loading during scans by doing repeated scans 053 * with this feature while multiple threads are continuously writing values; and 054 * verifying the result. 055 */ 056@Category(IntegrationTests.class) 057public class IntegrationTestLazyCfLoading { 058 private static final TableName TABLE_NAME = 059 TableName.valueOf(IntegrationTestLazyCfLoading.class.getSimpleName()); 060 private static final String TIMEOUT_KEY = "hbase.%s.timeout"; 061 private static final String ENCODING_KEY = "hbase.%s.datablock.encoding"; 062 063 /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */ 064 private static final int DEFAULT_TIMEOUT_MINUTES = 10; 065 066 private static final int NUM_SERVERS = 1; 067 /** Set regions per server low to ensure splits happen during test */ 068 private static final int REGIONS_PER_SERVER = 3; 069 private static final int KEYS_TO_WRITE_PER_SERVER = 20000; 070 private static final int WRITER_THREADS = 10; 071 private static final int WAIT_BETWEEN_SCANS_MS = 1000; 072 073 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLazyCfLoading.class); 074 private IntegrationTestingUtility util = new IntegrationTestingUtility(); 075 private final DataGenerator dataGen = new DataGenerator(); 076 077 /** Custom LoadTestDataGenerator. Uses key generation and verification from 078 * LoadTestKVGenerator. Creates 3 column families; one with an integer column to 079 * filter on, the 2nd one with an integer column that matches the first integer column (for 080 * test-specific verification), and byte[] value that is used for general verification; and 081 * the third one with just the value. 082 */ 083 private static class DataGenerator extends LoadTestDataGenerator { 084 private static final int MIN_DATA_SIZE = 4096; 085 private static final int MAX_DATA_SIZE = 65536; 086 public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential"); 087 public static final byte[] JOINED_CF1 = Bytes.toBytes("joined"); 088 public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2"); 089 public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter"); 090 public static final byte[] VALUE_COLUMN = Bytes.toBytes("val"); 091 public static final long ACCEPTED_VALUE = 1L; 092 093 private static final Map<byte[], byte[][]> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 094 095 private final AtomicLong expectedNumberOfKeys = new AtomicLong(0); 096 private final AtomicLong totalNumberOfKeys = new AtomicLong(0); 097 098 public DataGenerator() { 099 super(MIN_DATA_SIZE, MAX_DATA_SIZE); 100 columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN }); 101 columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN }); 102 columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN }); 103 } 104 105 public long getExpectedNumberOfKeys() { 106 return expectedNumberOfKeys.get(); 107 } 108 109 public long getTotalNumberOfKeys() { 110 return totalNumberOfKeys.get(); 111 } 112 113 @Override 114 public byte[] getDeterministicUniqueKey(long keyBase) { 115 return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes(); 116 } 117 118 @Override 119 public byte[][] getColumnFamilies() { 120 return columnMap.keySet().toArray(new byte[columnMap.size()][]); 121 } 122 123 @Override 124 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 125 return columnMap.get(cf); 126 } 127 128 @Override 129 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 130 if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { 131 // Random deterministic way to make some values "on" and others "off" for filters. 132 long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE; 133 if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) { 134 totalNumberOfKeys.incrementAndGet(); 135 if (value == ACCEPTED_VALUE) { 136 expectedNumberOfKeys.incrementAndGet(); 137 } 138 } 139 return Bytes.toBytes(value); 140 } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { 141 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 142 } 143 String error = "Unknown column " + Bytes.toString(column); 144 assert false : error; 145 throw new InvalidParameterException(error); 146 } 147 148 @Override 149 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 150 if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { 151 // Relies on the filter from getScanFilter being used. 152 return Bytes.toLong(value) == ACCEPTED_VALUE; 153 } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { 154 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 155 } 156 return false; // some bogus value from read, we don't expect any such thing. 157 } 158 159 @Override 160 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 161 return columnMap.get(cf).length == columnSet.size(); 162 } 163 164 public Filter getScanFilter() { 165 SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN, 166 CompareOperator.EQUAL, Bytes.toBytes(ACCEPTED_VALUE)); 167 scf.setFilterIfMissing(true); 168 return scf; 169 } 170 } 171 172 @Before 173 public void setUp() throws Exception { 174 LOG.info("Initializing cluster with " + NUM_SERVERS + " servers"); 175 util.initializeCluster(NUM_SERVERS); 176 LOG.info("Done initializing cluster"); 177 createTable(); 178 // after table creation, ACLs need time to be propagated to RSs in a secure deployment 179 // so we sleep a little bit because we don't have a good way to know when permissions 180 // are received by RSs 181 Thread.sleep(3000); 182 } 183 184 private void createTable() throws Exception { 185 deleteTable(); 186 LOG.info("Creating table"); 187 Configuration conf = util.getConfiguration(); 188 String encodingKey = String.format(ENCODING_KEY, this.getClass().getSimpleName()); 189 DataBlockEncoding blockEncoding = DataBlockEncoding.valueOf(conf.get(encodingKey, "FAST_DIFF")); 190 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); 191 for (byte[] cf : dataGen.getColumnFamilies()) { 192 HColumnDescriptor hcd = new HColumnDescriptor(cf); 193 hcd.setDataBlockEncoding(blockEncoding); 194 htd.addFamily(hcd); 195 } 196 int serverCount = util.getHBaseClusterInterface().getClusterMetrics() 197 .getLiveServerMetrics().size(); 198 byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER); 199 util.getAdmin().createTable(htd, splits); 200 LOG.info("Created table"); 201 } 202 203 private void deleteTable() throws Exception { 204 if (util.getAdmin().tableExists(TABLE_NAME)) { 205 LOG.info("Deleting table"); 206 util.deleteTable(TABLE_NAME); 207 LOG.info("Deleted table"); 208 } 209 } 210 211 @After 212 public void tearDown() throws Exception { 213 deleteTable(); 214 LOG.info("Restoring the cluster"); 215 util.restoreCluster(); 216 LOG.info("Done restoring the cluster"); 217 } 218 219 @Test 220 public void testReadersAndWriters() throws Exception { 221 Configuration conf = util.getConfiguration(); 222 String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); 223 long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); 224 long serverCount = util.getHBaseClusterInterface().getClusterMetrics() 225 .getLiveServerMetrics().size(); 226 long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; 227 Connection connection = ConnectionFactory.createConnection(conf); 228 Table table = connection.getTable(TABLE_NAME); 229 230 // Create multi-threaded writer and start it. We write multiple columns/CFs and verify 231 // their integrity, therefore multi-put is necessary. 232 MultiThreadedWriter writer = 233 new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 234 writer.setMultiPut(true); 235 236 LOG.info("Starting writer; the number of keys to write is " + keysToWrite); 237 // TODO : Need to see if tag support has to be given here in the integration test suite 238 writer.start(1, keysToWrite, WRITER_THREADS); 239 240 // Now, do scans. 241 long now = EnvironmentEdgeManager.currentTime(); 242 long timeLimit = now + (maxRuntime * 60000); 243 boolean isWriterDone = false; 244 while (now < timeLimit && !isWriterDone) { 245 LOG.info("Starting the scan; wrote approximately " 246 + dataGen.getTotalNumberOfKeys() + " keys"); 247 isWriterDone = writer.isDone(); 248 if (isWriterDone) { 249 LOG.info("Scanning full result, writer is done"); 250 } 251 Scan scan = new Scan(); 252 for (byte[] cf : dataGen.getColumnFamilies()) { 253 scan.addFamily(cf); 254 } 255 scan.setFilter(dataGen.getScanFilter()); 256 scan.setLoadColumnFamiliesOnDemand(true); 257 // The number of keys we can expect from scan - lower bound (before scan). 258 // Not a strict lower bound - writer knows nothing about filters, so we report 259 // this from generator. Writer might have generated the value but not put it yet. 260 long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); 261 long startTs = EnvironmentEdgeManager.currentTime(); 262 ResultScanner results = table.getScanner(scan); 263 long resultCount = 0; 264 Result result = null; 265 // Verify and count the results. 266 while ((result = results.next()) != null) { 267 boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); 268 Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk); 269 ++resultCount; 270 } 271 long timeTaken = EnvironmentEdgeManager.currentTime() - startTs; 272 // Verify the result count. 273 long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); 274 Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan 275 + " were generated ", onesGennedAfterScan >= resultCount); 276 if (isWriterDone) { 277 Assert.assertTrue("Read " + resultCount + " keys; the writer is done and " 278 + onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount); 279 } else if (onesGennedBeforeScan * 0.9 > resultCount) { 280 LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan 281 + ") - there might be a problem, or the writer might just be slow"); 282 } 283 LOG.info("Scan took " + timeTaken + "ms"); 284 if (!isWriterDone) { 285 Thread.sleep(WAIT_BETWEEN_SCANS_MS); 286 now = EnvironmentEdgeManager.currentTime(); 287 } 288 } 289 Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures()); 290 Assert.assertTrue("Writer is not done", isWriterDone); 291 // Assert.fail("Boom!"); 292 connection.close(); 293 } 294}