001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Table;
027import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
028import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
029import org.apache.hadoop.hbase.ipc.ServerRpcController;
030import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
031import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
032import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
033import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
034import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
036import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
037import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
038import org.apache.hadoop.hbase.util.ByteStringer;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.security.token.Token;
041import org.apache.yetus.audience.InterfaceAudience;
042
043/**
044 * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
045 * @deprecated Use for backward compatibility testing only. Will be removed when
046 *             SecureBulkLoadEndpoint is not supported.
047 */
048@Deprecated
049@InterfaceAudience.Private
050public class SecureBulkLoadEndpointClient {
051  private Table table;
052
053  public SecureBulkLoadEndpointClient(Table table) {
054    this.table = table;
055  }
056
057  public String prepareBulkLoad(final TableName tableName) throws IOException {
058    try {
059      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
060      SecureBulkLoadProtos.SecureBulkLoadService instance =
061          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
062
063      ServerRpcController controller = new ServerRpcController();
064
065      CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
066          new CoprocessorRpcUtils.BlockingRpcCallback<>();
067
068      PrepareBulkLoadRequest request =
069          PrepareBulkLoadRequest.newBuilder()
070          .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
071
072      instance.prepareBulkLoad(controller, request, rpcCallback);
073
074      PrepareBulkLoadResponse response = rpcCallback.get();
075      if (controller.failedOnException()) {
076        throw controller.getFailedOn();
077      }
078
079      return response.getBulkToken();
080    } catch (Throwable throwable) {
081      throw new IOException(throwable);
082    }
083  }
084
085  public void cleanupBulkLoad(final String bulkToken) throws IOException {
086    try {
087      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
088      SecureBulkLoadProtos.SecureBulkLoadService instance =
089          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
090
091      ServerRpcController controller = new ServerRpcController();
092
093      CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
094          new CoprocessorRpcUtils.BlockingRpcCallback<>();
095
096      CleanupBulkLoadRequest request =
097          CleanupBulkLoadRequest.newBuilder()
098              .setBulkToken(bulkToken).build();
099
100      instance.cleanupBulkLoad(controller,
101          request,
102          rpcCallback);
103
104      if (controller.failedOnException()) {
105        throw controller.getFailedOn();
106      }
107    } catch (Throwable throwable) {
108      throw new IOException(throwable);
109    }
110  }
111
112  public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
113          final Token<?> userToken, final String bulkToken, final byte[] startRow)
114          throws IOException {
115    // we never want to send a batch of HFiles to all regions, thus cannot call
116    // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
117    try {
118      CoprocessorRpcChannel channel = table.coprocessorService(startRow);
119      SecureBulkLoadProtos.SecureBulkLoadService instance =
120          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
121
122      DelegationToken protoDT =
123          DelegationToken.newBuilder().build();
124      if(userToken != null) {
125        protoDT =
126            DelegationToken.newBuilder()
127              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
128              .setPassword(ByteStringer.wrap(userToken.getPassword()))
129              .setKind(userToken.getKind().toString())
130              .setService(userToken.getService().toString()).build();
131      }
132
133      List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
134          new ArrayList<>(familyPaths.size());
135      for(Pair<byte[], String> el: familyPaths) {
136        protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
137          .setFamily(ByteStringer.wrap(el.getFirst()))
138          .setPath(el.getSecond()).build());
139      }
140
141      SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
142          SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
143            .setFsToken(protoDT)
144            .addAllFamilyPath(protoFamilyPaths)
145            .setBulkToken(bulkToken).build();
146
147      ServerRpcController controller = new ServerRpcController();
148      CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>
149            rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
150      instance.secureBulkLoadHFiles(controller,
151        request,
152        rpcCallback);
153
154      SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
155      if (controller.failedOnException()) {
156        throw controller.getFailedOn();
157      }
158      return response.getLoaded();
159    } catch (Throwable throwable) {
160      throw new IOException(throwable);
161    }
162  }
163}