001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; 029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ClientServiceCallable; 032import org.apache.hadoop.hbase.client.ClusterConnection; 033import org.apache.hadoop.hbase.client.RpcRetryingCaller; 034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Pair; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Ignore; 045import org.junit.experimental.categories.Category; 046import org.junit.runner.RunWith; 047import org.junit.runners.Parameterized; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052 053import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 056 057/** 058 * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be 059 * removed when old non-secure client for backward compatibility is not supported. 060 */ 061@RunWith(Parameterized.class) 062@Category({RegionServerTests.class, LargeTests.class}) 063@Ignore // BROKEN. FIX OR REMOVE. 064public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad { 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); 068 069 public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) { 070 super(duration); 071 } 072 073 private static final Logger LOG = 074 LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws IOException { 078 conf.setInt("hbase.rpc.timeout", 10 * 1000); 079 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 080 "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 081 } 082 083 public static class AtomicHFileLoader extends RepeatingTestThread { 084 final AtomicLong numBulkLoads = new AtomicLong(); 085 final AtomicLong numCompactions = new AtomicLong(); 086 private TableName tableName; 087 088 public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies) 089 throws IOException { 090 super(ctx); 091 this.tableName = tableName; 092 } 093 094 public void doAnAction() throws Exception { 095 long iteration = numBulkLoads.getAndIncrement(); 096 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", 097 iteration)); 098 099 // create HFiles for different column families 100 FileSystem fs = UTIL.getTestFileSystem(); 101 byte[] val = Bytes.toBytes(String.format("%010d", iteration)); 102 final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS); 103 for (int i = 0; i < NUM_CFS; i++) { 104 Path hfile = new Path(dir, family(i)); 105 byte[] fam = Bytes.toBytes(family(i)); 106 createHFile(fs, hfile, fam, QUAL, val, 1000); 107 famPaths.add(new Pair<>(fam, hfile.toString())); 108 } 109 110 // bulk load HFiles 111 final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); 112 Table table = conn.getTable(tableName); 113 final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); 114 RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); 115 ClientServiceCallable<Void> callable = 116 new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), 117 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 118 @Override 119 protected Void rpcCall() throws Exception { 120 LOG.debug("Going to connect to server " + getLocation() + " for row " + 121 Bytes.toStringBinary(getRow())); 122 try (Table table = conn.getTable(getTableName())) { 123 boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, 124 null, bulkToken, getLocation().getRegionInfo().getStartKey()); 125 } 126 return null; 127 } 128 }; 129 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); 130 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 131 caller.callWithRetries(callable, Integer.MAX_VALUE); 132 133 // Periodically do compaction to reduce the number of open file handles. 134 if (numBulkLoads.get() % 5 == 0) { 135 // 5 * 50 = 250 open file handles! 136 callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), 137 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 138 @Override 139 protected Void rpcCall() throws Exception { 140 LOG.debug("compacting " + getLocation() + " for row " 141 + Bytes.toStringBinary(getRow())); 142 AdminProtos.AdminService.BlockingInterface server = 143 conn.getAdmin(getLocation().getServerName()); 144 CompactRegionRequest request = 145 RequestConverter.buildCompactRegionRequest( 146 getLocation().getRegionInfo().getRegionName(), true, null); 147 server.compactRegion(null, request); 148 numCompactions.incrementAndGet(); 149 return null; 150 } 151 }; 152 caller.callWithRetries(callable, Integer.MAX_VALUE); 153 } 154 } 155 } 156 157 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) 158 throws Exception { 159 setupTable(tableName, 10); 160 161 TestContext ctx = new TestContext(UTIL.getConfiguration()); 162 163 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); 164 ctx.addThread(loader); 165 166 List<AtomicScanReader> scanners = Lists.newArrayList(); 167 for (int i = 0; i < numScanners; i++) { 168 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); 169 scanners.add(scanner); 170 ctx.addThread(scanner); 171 } 172 173 ctx.startThreads(); 174 ctx.waitFor(millisToRun); 175 ctx.stop(); 176 177 LOG.info("Loaders:"); 178 LOG.info(" loaded " + loader.numBulkLoads.get()); 179 LOG.info(" compations " + loader.numCompactions.get()); 180 181 LOG.info("Scanners:"); 182 for (AtomicScanReader scanner : scanners) { 183 LOG.info(" scanned " + scanner.numScans.get()); 184 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); 185 } 186 } 187}