001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
029import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.ClientServiceCallable;
032import org.apache.hadoop.hbase.client.ClusterConnection;
033import org.apache.hadoop.hbase.client.RpcRetryingCaller;
034import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
035import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.junit.ClassRule;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
053
054/**
055 * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
056 * removed when old non-secure client for backward compatibility is not supported.
057 */
058@RunWith(Parameterized.class)
059@Category({RegionServerTests.class, LargeTests.class})
060public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class);
065
066  private static final Logger LOG =
067      LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class);
068
069  public TestHRegionServerBulkLoadWithOldClient(int duration) {
070    super(duration);
071  }
072
073  public static class AtomicHFileLoader extends RepeatingTestThread {
074    final AtomicLong numBulkLoads = new AtomicLong();
075    final AtomicLong numCompactions = new AtomicLong();
076    private TableName tableName;
077
078    public AtomicHFileLoader(TableName tableName, TestContext ctx,
079        byte targetFamilies[][]) throws IOException {
080      super(ctx);
081      this.tableName = tableName;
082    }
083
084    @Override
085    public void doAnAction() throws Exception {
086      long iteration = numBulkLoads.getAndIncrement();
087      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
088          iteration));
089
090      // create HFiles for different column families
091      FileSystem fs = UTIL.getTestFileSystem();
092      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
093      final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
094      for (int i = 0; i < NUM_CFS; i++) {
095        Path hfile = new Path(dir, family(i));
096        byte[] fam = Bytes.toBytes(family(i));
097        createHFile(fs, hfile, fam, QUAL, val, 1000);
098        famPaths.add(new Pair<>(fam, hfile.toString()));
099      }
100
101      // bulk load HFiles
102      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
103      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
104      ClientServiceCallable<Void> callable =
105          new ClientServiceCallable<Void>(conn, tableName,
106              Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
107        @Override
108        protected Void rpcCall() throws Exception {
109          LOG.info("Non-secure old client");
110          byte[] regionName = getLocation().getRegionInfo().getRegionName();
111              BulkLoadHFileRequest request =
112                  RequestConverter
113                      .buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
114              getStub().bulkLoadHFile(null, request);
115              return null;
116        }
117      };
118      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
119      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
120      caller.callWithRetries(callable, Integer.MAX_VALUE);
121
122      // Periodically do compaction to reduce the number of open file handles.
123      if (numBulkLoads.get() % 5 == 0) {
124        // 5 * 50 = 250 open file handles!
125        callable = new ClientServiceCallable<Void>(conn, tableName,
126            Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
127          @Override
128          protected Void rpcCall() throws Exception {
129            LOG.debug("compacting " + getLocation() + " for row "
130                + Bytes.toStringBinary(getRow()));
131            AdminProtos.AdminService.BlockingInterface server =
132              conn.getAdmin(getLocation().getServerName());
133            CompactRegionRequest request =
134              RequestConverter.buildCompactRegionRequest(
135                getLocation().getRegionInfo().getRegionName(), true, null);
136            server.compactRegion(null, request);
137            numCompactions.incrementAndGet();
138            return null;
139          }
140        };
141        caller.callWithRetries(callable, Integer.MAX_VALUE);
142      }
143    }
144  }
145
146  @Override
147  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
148      throws Exception {
149    setupTable(tableName, 10);
150
151    TestContext ctx = new TestContext(UTIL.getConfiguration());
152
153    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
154    ctx.addThread(loader);
155
156    List<AtomicScanReader> scanners = Lists.newArrayList();
157    for (int i = 0; i < numScanners; i++) {
158      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
159      scanners.add(scanner);
160      ctx.addThread(scanner);
161    }
162
163    ctx.startThreads();
164    ctx.waitFor(millisToRun);
165    ctx.stop();
166
167    LOG.info("Loaders:");
168    LOG.info("  loaded " + loader.numBulkLoads.get());
169    LOG.info("  compations " + loader.numCompactions.get());
170
171    LOG.info("Scanners:");
172    for (AtomicScanReader scanner : scanners) {
173      LOG.info("  scanned " + scanner.numScans.get());
174      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
175    }
176  }
177}