001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ClientServiceCallable; 032import org.apache.hadoop.hbase.client.ClusterConnection; 033import org.apache.hadoop.hbase.client.RpcRetryingCaller; 034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 035import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.junit.ClassRule; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 053 054/** 055 * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be 056 * removed when old non-secure client for backward compatibility is not supported. 057 */ 058@RunWith(Parameterized.class) 059@Category({RegionServerTests.class, LargeTests.class}) 060public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class); 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class); 068 069 public TestHRegionServerBulkLoadWithOldClient(int duration) { 070 super(duration); 071 } 072 073 public static class AtomicHFileLoader extends RepeatingTestThread { 074 final AtomicLong numBulkLoads = new AtomicLong(); 075 final AtomicLong numCompactions = new AtomicLong(); 076 private TableName tableName; 077 078 public AtomicHFileLoader(TableName tableName, TestContext ctx, 079 byte targetFamilies[][]) throws IOException { 080 super(ctx); 081 this.tableName = tableName; 082 } 083 084 @Override 085 public void doAnAction() throws Exception { 086 long iteration = numBulkLoads.getAndIncrement(); 087 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", 088 iteration)); 089 090 // create HFiles for different column families 091 FileSystem fs = UTIL.getTestFileSystem(); 092 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 093 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 094 for (int i = 0; i < NUM_CFS; i++) { 095 Path hfile = new Path(dir, family(i)); 096 byte[] fam = Bytes.toBytes(family(i)); 097 createHFile(fs, hfile, fam, QUAL, val, 1000); 098 famPaths.add(new Pair<>(fam, hfile.toString())); 099 } 100 101 // bulk load HFiles 102 final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); 103 RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); 104 ClientServiceCallable<Void> callable = 105 new ClientServiceCallable<Void>(conn, tableName, 106 Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 107 @Override 108 protected Void rpcCall() throws Exception { 109 LOG.info("Non-secure old client"); 110 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 111 BulkLoadHFileRequest request = 112 RequestConverter 113 .buildBulkLoadHFileRequest(famPaths, regionName, true, null, null); 114 getStub().bulkLoadHFile(null, request); 115 return null; 116 } 117 }; 118 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 119 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 120 caller.callWithRetries(callable, Integer.MAX_VALUE); 121 122 // Periodically do compaction to reduce the number of open file handles. 123 if (numBulkLoads.get() % 5 == 0) { 124 // 5 * 50 = 250 open file handles! 125 callable = new ClientServiceCallable<Void>(conn, tableName, 126 Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 127 @Override 128 protected Void rpcCall() throws Exception { 129 LOG.debug("compacting " + getLocation() + " for row " 130 + Bytes.toStringBinary(getRow())); 131 AdminProtos.AdminService.BlockingInterface server = 132 conn.getAdmin(getLocation().getServerName()); 133 CompactRegionRequest request = 134 RequestConverter.buildCompactRegionRequest( 135 getLocation().getRegionInfo().getRegionName(), true, null); 136 server.compactRegion(null, request); 137 numCompactions.incrementAndGet(); 138 return null; 139 } 140 }; 141 caller.callWithRetries(callable, Integer.MAX_VALUE); 142 } 143 } 144 } 145 146 @Override 147 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 148 throws Exception { 149 setupTable(tableName, 10); 150 151 TestContext ctx = new TestContext(UTIL.getConfiguration()); 152 153 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 154 ctx.addThread(loader); 155 156 List<AtomicScanReader> scanners = Lists.newArrayList(); 157 for (int i = 0; i < numScanners; i++) { 158 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 159 scanners.add(scanner); 160 ctx.addThread(scanner); 161 } 162 163 ctx.startThreads(); 164 ctx.waitFor(millisToRun); 165 ctx.stop(); 166 167 LOG.info("Loaders:"); 168 LOG.info(" loaded " + loader.numBulkLoads.get()); 169 LOG.info(" compations " + loader.numCompactions.get()); 170 171 LOG.info("Scanners:"); 172 for (AtomicScanReader scanner : scanners) { 173 LOG.info(" scanned " + scanner.numScans.get()); 174 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 175 } 176 } 177}