001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.ClientServiceCallable;
032import org.apache.hadoop.hbase.client.ClusterConnection;
033import org.apache.hadoop.hbase.client.RpcRetryingCaller;
034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
037import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.Pair;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Ignore;
045import org.junit.experimental.categories.Category;
046import org.junit.runner.RunWith;
047import org.junit.runners.Parameterized;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
056
057/**
058 * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be
059 * removed when old non-secure client for backward compatibility is not supported.
060 */
061@RunWith(Parameterized.class)
062@Category({RegionServerTests.class, LargeTests.class})
063@Ignore // BROKEN. FIX OR REMOVE.
064public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
068
069  public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
070    super(duration);
071  }
072
073  private static final Logger LOG =
074      LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
075
076  @BeforeClass
077  public static void setUpBeforeClass() throws IOException {
078    conf.setInt("hbase.rpc.timeout", 10 * 1000);
079    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
080      "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
081  }
082
083  public static class AtomicHFileLoader extends RepeatingTestThread {
084    final AtomicLong numBulkLoads = new AtomicLong();
085    final AtomicLong numCompactions = new AtomicLong();
086    private TableName tableName;
087
088    public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies)
089            throws IOException {
090      super(ctx);
091      this.tableName = tableName;
092    }
093
094    public void doAnAction() throws Exception {
095      long iteration = numBulkLoads.getAndIncrement();
096      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
097          iteration));
098
099      // create HFiles for different column families
100      FileSystem fs = UTIL.getTestFileSystem();
101      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
102      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
103      for (int i = 0; i < NUM_CFS; i++) {
104        Path hfile = new Path(dir, family(i));
105        byte[] fam = Bytes.toBytes(family(i));
106        createHFile(fs, hfile, fam, QUAL, val, 1000);
107        famPaths.add(new Pair<>(fam, hfile.toString()));
108      }
109
110      // bulk load HFiles
111      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
112      Table table = conn.getTable(tableName);
113      final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
114      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
115      ClientServiceCallable<Void> callable =
116        new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
117            rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
118          @Override
119          protected Void rpcCall() throws Exception {
120            LOG.debug("Going to connect to server " + getLocation() + " for row " +
121                Bytes.toStringBinary(getRow()));
122            try (Table table = conn.getTable(getTableName())) {
123              boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
124                  null, bulkToken, getLocation().getRegionInfo().getStartKey());
125            }
126            return null;
127          }
128        };
129      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
130      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
131      caller.callWithRetries(callable, Integer.MAX_VALUE);
132
133      // Periodically do compaction to reduce the number of open file handles.
134      if (numBulkLoads.get() % 5 == 0) {
135        // 5 * 50 = 250 open file handles!
136        callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
137            rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
138          @Override
139          protected Void rpcCall() throws Exception {
140            LOG.debug("compacting " + getLocation() + " for row "
141                + Bytes.toStringBinary(getRow()));
142            AdminProtos.AdminService.BlockingInterface server =
143              conn.getAdmin(getLocation().getServerName());
144            CompactRegionRequest request =
145              RequestConverter.buildCompactRegionRequest(
146                getLocation().getRegionInfo().getRegionName(), true, null);
147            server.compactRegion(null, request);
148            numCompactions.incrementAndGet();
149            return null;
150          }
151        };
152        caller.callWithRetries(callable, Integer.MAX_VALUE);
153      }
154    }
155  }
156
157  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
158          throws Exception {
159    setupTable(tableName, 10);
160
161    TestContext ctx = new TestContext(UTIL.getConfiguration());
162
163    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
164    ctx.addThread(loader);
165
166    List<AtomicScanReader> scanners = Lists.newArrayList();
167    for (int i = 0; i < numScanners; i++) {
168      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
169      scanners.add(scanner);
170      ctx.addThread(scanner);
171    }
172
173    ctx.startThreads();
174    ctx.waitFor(millisToRun);
175    ctx.stop();
176
177    LOG.info("Loaders:");
178    LOG.info("  loaded " + loader.numBulkLoads.get());
179    LOG.info("  compations " + loader.numCompactions.get());
180
181    LOG.info("Scanners:");
182    for (AtomicScanReader scanner : scanners) {
183      LOG.info("  scanned " + scanner.numScans.get());
184      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
185    }
186  }
187}