001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
030import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
039import org.apache.hadoop.security.token.Token;
040
041import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
042
043/**
044 * Client proxy for SecureBulkLoadProtocol
045 */
046@InterfaceAudience.Private
047public class SecureBulkLoadClient {
048  private Table table;
049  private final RpcControllerFactory rpcControllerFactory;
050
051  public SecureBulkLoadClient(final Configuration conf, Table table) {
052    this.table = table;
053    this.rpcControllerFactory = new RpcControllerFactory(conf);
054  }
055
056  public String prepareBulkLoad(final Connection conn) throws IOException {
057    try {
058      ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
059          table.getName(), HConstants.EMPTY_START_ROW,
060          this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
061        @Override
062        protected String rpcCall() throws Exception {
063          byte[] regionName = getLocation().getRegionInfo().getRegionName();
064          RegionSpecifier region =
065              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
066          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
067              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
068              .setRegion(region).build();
069          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
070          return response.getBulkToken();
071        }
072      };
073      return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
074          .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
075    } catch (Throwable throwable) {
076      throw new IOException(throwable);
077    }
078  }
079
080  public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
081    try {
082      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
083          table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
084        @Override
085        protected Void rpcCall() throws Exception {
086          byte[] regionName = getLocation().getRegionInfo().getRegionName();
087          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
088              RegionSpecifierType.REGION_NAME, regionName);
089          CleanupBulkLoadRequest request =
090              CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
091          getStub().cleanupBulkLoad(null, request);
092          return null;
093        }
094      };
095      RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
096          .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
097    } catch (Throwable throwable) {
098      throw new IOException(throwable);
099    }
100  }
101
102  /**
103   * Securely bulk load a list of HFiles using client protocol.
104   *
105   * @param client
106   * @param familyPaths
107   * @param regionName
108   * @param assignSeqNum
109   * @param userToken
110   * @param bulkToken
111   * @return true if all are loaded
112   * @throws IOException
113   */
114  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
115      final List<Pair<byte[], String>> familyPaths,
116      final byte[] regionName, boolean assignSeqNum,
117      final Token<?> userToken, final String bulkToken) throws IOException {
118    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
119        false);
120  }
121
122  /**
123   * Securely bulk load a list of HFiles using client protocol.
124   *
125   * @param client
126   * @param familyPaths
127   * @param regionName
128   * @param assignSeqNum
129   * @param userToken
130   * @param bulkToken
131   * @param copyFiles
132   * @return true if all are loaded
133   * @throws IOException
134   */
135  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
136      final List<Pair<byte[], String>> familyPaths,
137      final byte[] regionName, boolean assignSeqNum,
138      final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
139    BulkLoadHFileRequest request =
140        RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
141          userToken, bulkToken, copyFiles);
142
143    try {
144      BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
145      return response.getLoaded();
146    } catch (Exception se) {
147      throw ProtobufUtil.handleRemoteException(se);
148    }
149  }
150}