001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
021
022import java.io.IOException;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.security.token.Token;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
032import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
041
042/**
043 * Client proxy for SecureBulkLoadProtocol
044 */
045@InterfaceAudience.Private
046public class SecureBulkLoadClient {
047  private Table table;
048  private final RpcControllerFactory rpcControllerFactory;
049
050  public SecureBulkLoadClient(final Configuration conf, Table table) {
051    this.table = table;
052    this.rpcControllerFactory = new RpcControllerFactory(conf);
053  }
054
055  public String prepareBulkLoad(final Connection conn) throws IOException {
056    try {
057      ClientServiceCallable<String> callable =
058        new ClientServiceCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW,
059          this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
060          @Override
061          protected String rpcCall() throws Exception {
062            byte[] regionName = getLocation().getRegionInfo().getRegionName();
063            RegionSpecifier region =
064              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
065            PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
066              .setTableName(ProtobufUtil.toProtoTableName(table.getName())).setRegion(region)
067              .build();
068            PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
069            return response.getBulkToken();
070          }
071        };
072      return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
073        .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
074    } catch (Throwable throwable) {
075      throw new IOException(throwable);
076    }
077  }
078
079  public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
080    try {
081      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, table.getName(),
082        HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
083        @Override
084        protected Void rpcCall() throws Exception {
085          byte[] regionName = getLocation().getRegionInfo().getRegionName();
086          RegionSpecifier region =
087            RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
088          CleanupBulkLoadRequest request =
089            CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
090          getStub().cleanupBulkLoad(null, request);
091          return null;
092        }
093      };
094      RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
095        .callWithRetries(callable, Integer.MAX_VALUE);
096    } catch (Throwable throwable) {
097      throw new IOException(throwable);
098    }
099  }
100
101  /**
102   * Securely bulk load a list of HFiles using client protocol. nnnnnn * @return true if all are
103   * loaded n
104   */
105  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
106    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
107    final Token<?> userToken, final String bulkToken) throws IOException {
108    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
109      false, null, true);
110  }
111
112  /**
113   * Securely bulk load a list of HFiles using client protocol. nnnnnnn * @return true if all are
114   * loaded n
115   */
116  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
117    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
118    final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
119    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
120      copyFiles, null, true);
121  }
122
123  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
124    final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
125    final Token<?> userToken, final String bulkToken, boolean copyFiles, List<String> clusterIds,
126    boolean replicate) throws IOException {
127    BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths,
128      regionName, assignSeqNum, userToken, bulkToken, copyFiles, clusterIds, replicate);
129
130    try {
131      BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
132      return response.getLoaded();
133    } catch (Exception se) {
134      throw ProtobufUtil.handleRemoteException(se);
135    }
136  }
137}