001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ClientServiceCallable; 032import org.apache.hadoop.hbase.client.ClusterConnection; 033import org.apache.hadoop.hbase.client.RpcRetryingCaller; 034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 035import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.junit.ClassRule; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 053 054/** 055 * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be 056 * removed when old non-secure client for backward compatibility is not supported. 057 */ 058@RunWith(Parameterized.class) 059@Category({ RegionServerTests.class, LargeTests.class }) 060public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class); 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class); 068 069 public TestHRegionServerBulkLoadWithOldClient(int duration) { 070 super(duration); 071 } 072 073 public static class AtomicHFileLoader extends RepeatingTestThread { 074 final AtomicLong numBulkLoads = new AtomicLong(); 075 final AtomicLong numCompactions = new AtomicLong(); 076 private TableName tableName; 077 078 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte targetFamilies[][]) 079 throws IOException { 080 super(ctx); 081 this.tableName = tableName; 082 } 083 084 @Override 085 public void doAnAction() throws Exception { 086 long iteration = numBulkLoads.getAndIncrement(); 087 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", iteration)); 088 089 // create HFiles for different column families 090 FileSystem fs = UTIL.getTestFileSystem(); 091 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 092 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 093 for (int i = 0; i < NUM_CFS; i++) { 094 Path hfile = new Path(dir, family(i)); 095 byte[] fam = Bytes.toBytes(family(i)); 096 createHFile(fs, hfile, fam, QUAL, val, 1000); 097 famPaths.add(new Pair<>(fam, hfile.toString())); 098 } 099 100 // bulk load HFiles 101 final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); 102 RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); 103 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, 104 Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 105 @Override 106 protected Void rpcCall() throws Exception { 107 LOG.info("Non-secure old client"); 108 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 109 BulkLoadHFileRequest request = 110 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true, null, null); 111 getStub().bulkLoadHFile(null, request); 112 return null; 113 } 114 }; 115 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 116 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 117 caller.callWithRetries(callable, Integer.MAX_VALUE); 118 119 // Periodically do compaction to reduce the number of open file handles. 120 if (numBulkLoads.get() % 5 == 0) { 121 // 5 * 50 = 250 open file handles! 122 callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), 123 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 124 @Override 125 protected Void rpcCall() throws Exception { 126 LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); 127 AdminProtos.AdminService.BlockingInterface server = 128 conn.getAdmin(getLocation().getServerName()); 129 CompactRegionRequest request = RequestConverter 130 .buildCompactRegionRequest(getLocation().getRegionInfo().getRegionName(), true, null); 131 server.compactRegion(null, request); 132 numCompactions.incrementAndGet(); 133 return null; 134 } 135 }; 136 caller.callWithRetries(callable, Integer.MAX_VALUE); 137 } 138 } 139 } 140 141 @Override 142 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 143 throws Exception { 144 setupTable(tableName, 10); 145 146 TestContext ctx = new TestContext(UTIL.getConfiguration()); 147 148 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 149 ctx.addThread(loader); 150 151 List<AtomicScanReader> scanners = Lists.newArrayList(); 152 for (int i = 0; i < numScanners; i++) { 153 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 154 scanners.add(scanner); 155 ctx.addThread(scanner); 156 } 157 158 ctx.startThreads(); 159 ctx.waitFor(millisToRun); 160 ctx.stop(); 161 162 LOG.info("Loaders:"); 163 LOG.info(" loaded " + loader.numBulkLoads.get()); 164 LOG.info(" compations " + loader.numCompactions.get()); 165 166 LOG.info("Scanners:"); 167 for (AtomicScanReader scanner : scanners) { 168 LOG.info(" scanned " + scanner.numScans.get()); 169 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 170 } 171 } 172}