001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.security.InvalidParameterException; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeMap; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.filter.Filter; 039import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 041import org.apache.hadoop.hbase.testclassification.IntegrationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 045import org.apache.hadoop.hbase.util.MultiThreadedWriter; 046import org.apache.hadoop.hbase.util.RegionSplitter; 047import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 048import org.junit.jupiter.api.AfterEach; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Integration test that verifies lazy CF loading during scans by doing repeated scans with this 057 * feature while multiple threads are continuously writing values; and verifying the result. 058 */ 059@Tag(IntegrationTests.TAG) 060public class IntegrationTestLazyCfLoading { 061 private static final TableName TABLE_NAME = 062 TableName.valueOf(IntegrationTestLazyCfLoading.class.getSimpleName()); 063 @SuppressWarnings("InlineFormatString") 064 private static final String TIMEOUT_KEY = "hbase.%s.timeout"; 065 @SuppressWarnings("InlineFormatString") 066 private static final String ENCODING_KEY = "hbase.%s.datablock.encoding"; 067 068 /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */ 069 private static final int DEFAULT_TIMEOUT_MINUTES = 10; 070 071 private static final int NUM_SERVERS = 1; 072 /** Set regions per server low to ensure splits happen during test */ 073 private static final int REGIONS_PER_SERVER = 3; 074 private static final int KEYS_TO_WRITE_PER_SERVER = 20000; 075 private static final int WRITER_THREADS = 10; 076 private static final int WAIT_BETWEEN_SCANS_MS = 1000; 077 078 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLazyCfLoading.class); 079 private IntegrationTestingUtility util = new IntegrationTestingUtility(); 080 private final DataGenerator dataGen = new DataGenerator(); 081 082 /** 083 * Custom LoadTestDataGenerator. Uses key generation and verification from LoadTestKVGenerator. 084 * Creates 3 column families; one with an integer column to filter on, the 2nd one with an integer 085 * column that matches the first integer column (for test-specific verification), and byte[] value 086 * that is used for general verification; and the third one with just the value. 087 */ 088 private static class DataGenerator extends LoadTestDataGenerator { 089 private static final int MIN_DATA_SIZE = 4096; 090 private static final int MAX_DATA_SIZE = 65536; 091 public static final byte[] ESSENTIAL_CF = Bytes.toBytes("essential"); 092 public static final byte[] JOINED_CF1 = Bytes.toBytes("joined"); 093 public static final byte[] JOINED_CF2 = Bytes.toBytes("joined2"); 094 public static final byte[] FILTER_COLUMN = Bytes.toBytes("filter"); 095 public static final byte[] VALUE_COLUMN = Bytes.toBytes("val"); 096 public static final long ACCEPTED_VALUE = 1L; 097 098 private static final Map<byte[], byte[][]> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 099 100 private final AtomicLong expectedNumberOfKeys = new AtomicLong(0); 101 private final AtomicLong totalNumberOfKeys = new AtomicLong(0); 102 103 public DataGenerator() { 104 super(MIN_DATA_SIZE, MAX_DATA_SIZE); 105 columnMap.put(ESSENTIAL_CF, new byte[][] { FILTER_COLUMN }); 106 columnMap.put(JOINED_CF1, new byte[][] { FILTER_COLUMN, VALUE_COLUMN }); 107 columnMap.put(JOINED_CF2, new byte[][] { VALUE_COLUMN }); 108 } 109 110 public long getExpectedNumberOfKeys() { 111 return expectedNumberOfKeys.get(); 112 } 113 114 public long getTotalNumberOfKeys() { 115 return totalNumberOfKeys.get(); 116 } 117 118 @Override 119 public byte[] getDeterministicUniqueKey(long keyBase) { 120 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase)); 121 } 122 123 @Override 124 public byte[][] getColumnFamilies() { 125 return columnMap.keySet().toArray(new byte[columnMap.size()][]); 126 } 127 128 @Override 129 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 130 return columnMap.get(cf); 131 } 132 133 @Override 134 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 135 if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { 136 // Random deterministic way to make some values "on" and others "off" for filters. 137 long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE; 138 if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) { 139 totalNumberOfKeys.incrementAndGet(); 140 if (value == ACCEPTED_VALUE) { 141 expectedNumberOfKeys.incrementAndGet(); 142 } 143 } 144 return Bytes.toBytes(value); 145 } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { 146 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 147 } 148 String error = "Unknown column " + Bytes.toString(column); 149 assert false : error; 150 throw new InvalidParameterException(error); 151 } 152 153 @Override 154 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 155 if (Bytes.BYTES_COMPARATOR.compare(column, FILTER_COLUMN) == 0) { 156 // Relies on the filter from getScanFilter being used. 157 return Bytes.toLong(value) == ACCEPTED_VALUE; 158 } else if (Bytes.BYTES_COMPARATOR.compare(column, VALUE_COLUMN) == 0) { 159 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 160 } 161 return false; // some bogus value from read, we don't expect any such thing. 162 } 163 164 @Override 165 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 166 return columnMap.get(cf).length == columnSet.size(); 167 } 168 169 public Filter getScanFilter() { 170 SingleColumnValueFilter scf = new SingleColumnValueFilter(ESSENTIAL_CF, FILTER_COLUMN, 171 CompareOperator.EQUAL, Bytes.toBytes(ACCEPTED_VALUE)); 172 scf.setFilterIfMissing(true); 173 return scf; 174 } 175 } 176 177 @BeforeEach 178 public void setUp() throws Exception { 179 LOG.info("Initializing cluster with " + NUM_SERVERS + " servers"); 180 util.initializeCluster(NUM_SERVERS); 181 LOG.info("Done initializing cluster"); 182 createTable(); 183 // after table creation, ACLs need time to be propagated to RSs in a secure deployment 184 // so we sleep a little bit because we don't have a good way to know when permissions 185 // are received by RSs 186 Thread.sleep(3000); 187 } 188 189 private void createTable() throws Exception { 190 deleteTable(); 191 LOG.info("Creating table"); 192 Configuration conf = util.getConfiguration(); 193 String encodingKey = String.format(ENCODING_KEY, this.getClass().getSimpleName()); 194 DataBlockEncoding blockEncoding = DataBlockEncoding.valueOf(conf.get(encodingKey, "FAST_DIFF")); 195 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 196 for (byte[] cf : dataGen.getColumnFamilies()) { 197 ColumnFamilyDescriptor familyDescriptor = 198 ColumnFamilyDescriptorBuilder.newBuilder(cf).setDataBlockEncoding(blockEncoding).build(); 199 builder.setColumnFamily(familyDescriptor); 200 } 201 int serverCount = 202 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 203 byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER); 204 util.getAdmin().createTable(builder.build(), splits); 205 LOG.info("Created table"); 206 } 207 208 private void deleteTable() throws Exception { 209 if (util.getAdmin().tableExists(TABLE_NAME)) { 210 LOG.info("Deleting table"); 211 util.deleteTable(TABLE_NAME); 212 LOG.info("Deleted table"); 213 } 214 } 215 216 @AfterEach 217 public void tearDown() throws Exception { 218 deleteTable(); 219 LOG.info("Restoring the cluster"); 220 util.restoreCluster(); 221 LOG.info("Done restoring the cluster"); 222 } 223 224 @Test 225 public void testReadersAndWriters() throws Exception { 226 Configuration conf = util.getConfiguration(); 227 String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName()); 228 long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES); 229 long serverCount = 230 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 231 long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER; 232 Connection connection = ConnectionFactory.createConnection(conf); 233 Table table = connection.getTable(TABLE_NAME); 234 235 // Create multi-threaded writer and start it. We write multiple columns/CFs and verify 236 // their integrity, therefore multi-put is necessary. 237 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 238 writer.setMultiPut(true); 239 240 LOG.info("Starting writer; the number of keys to write is " + keysToWrite); 241 // TODO : Need to see if tag support has to be given here in the integration test suite 242 writer.start(1, keysToWrite, WRITER_THREADS); 243 244 // Now, do scans. 245 long now = EnvironmentEdgeManager.currentTime(); 246 long timeLimit = now + (maxRuntime * 60000); 247 boolean isWriterDone = false; 248 while (now < timeLimit && !isWriterDone) { 249 LOG 250 .info("Starting the scan; wrote approximately " + dataGen.getTotalNumberOfKeys() + " keys"); 251 isWriterDone = writer.isDone(); 252 if (isWriterDone) { 253 LOG.info("Scanning full result, writer is done"); 254 } 255 Scan scan = new Scan(); 256 for (byte[] cf : dataGen.getColumnFamilies()) { 257 scan.addFamily(cf); 258 } 259 scan.setFilter(dataGen.getScanFilter()); 260 scan.setLoadColumnFamiliesOnDemand(true); 261 // The number of keys we can expect from scan - lower bound (before scan). 262 // Not a strict lower bound - writer knows nothing about filters, so we report 263 // this from generator. Writer might have generated the value but not put it yet. 264 long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys(); 265 long startTs = EnvironmentEdgeManager.currentTime(); 266 ResultScanner results = table.getScanner(scan); 267 long resultCount = 0; 268 Result result = null; 269 // Verify and count the results. 270 while ((result = results.next()) != null) { 271 boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true); 272 assertTrue(isOk, "Failed to verify [" + Bytes.toString(result.getRow()) + "]"); 273 ++resultCount; 274 } 275 long timeTaken = EnvironmentEdgeManager.currentTime() - startTs; 276 // Verify the result count. 277 long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys(); 278 assertTrue(onesGennedAfterScan >= resultCount, 279 "Read " + resultCount + " keys when at most " + onesGennedAfterScan + " were generated "); 280 if (isWriterDone) { 281 assertTrue(onesGennedAfterScan == resultCount, "Read " + resultCount 282 + " keys; the writer is done and " + onesGennedAfterScan + " keys were generated"); 283 } else if (onesGennedBeforeScan * 0.9 > resultCount) { 284 LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan 285 + ") - there might be a problem, or the writer might just be slow"); 286 } 287 LOG.info("Scan took " + timeTaken + "ms"); 288 if (!isWriterDone) { 289 Thread.sleep(WAIT_BETWEEN_SCANS_MS); 290 now = EnvironmentEdgeManager.currentTime(); 291 } 292 } 293 assertEquals(0, writer.getNumWriteFailures(), "There are write failures"); 294 assertTrue(isWriterDone, "Writer is not done"); 295 // Assert.fail("Boom!"); 296 connection.close(); 297 } 298}