View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import org.apache.hadoop.hbase.client.Table;
22  import org.apache.hadoop.hbase.util.ByteStringer;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
32  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
33  import org.apache.hadoop.hbase.ipc.ServerRpcController;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
36  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
37  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
38  import org.apache.hadoop.hbase.util.Pair;
39  import org.apache.hadoop.security.token.Token;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.List;
44  
45  /**
46   * Client proxy for SecureBulkLoadProtocol
47   * used in conjunction with SecureBulkLoadEndpoint
48   */
49  @InterfaceAudience.Private
50  public class SecureBulkLoadClient {
51    private Table table;
52  
53    public SecureBulkLoadClient(Table table) {
54      this.table = table;
55    }
56  
57    public String prepareBulkLoad(final TableName tableName) throws IOException {
58      try {
59        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
60        SecureBulkLoadProtos.SecureBulkLoadService instance =
61            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
62  
63        ServerRpcController controller = new ServerRpcController();
64  
65        BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
66            new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
67  
68        SecureBulkLoadProtos.PrepareBulkLoadRequest request =
69            SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
70            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
71  
72        instance.prepareBulkLoad(controller,
73            request,
74            rpcCallback);
75  
76        SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
77        if (controller.failedOnException()) {
78          throw controller.getFailedOn();
79        }
80  
81        return response.getBulkToken();
82      } catch (Throwable throwable) {
83        throw new IOException(throwable);
84      }
85    }
86  
87    public void cleanupBulkLoad(final String bulkToken) throws IOException {
88      try {
89        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
90        SecureBulkLoadProtos.SecureBulkLoadService instance =
91            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
92  
93        ServerRpcController controller = new ServerRpcController();
94  
95        BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
96            new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
97  
98        SecureBulkLoadProtos.CleanupBulkLoadRequest request =
99            SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
100               .setBulkToken(bulkToken).build();
101 
102       instance.cleanupBulkLoad(controller,
103           request,
104           rpcCallback);
105 
106       if (controller.failedOnException()) {
107         throw controller.getFailedOn();
108       }
109     } catch (Throwable throwable) {
110       throw new IOException(throwable);
111     }
112   }
113 
114   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
115                          final Token<?> userToken,
116                          final String bulkToken,
117                          final byte[] startRow) throws IOException {
118     // we never want to send a batch of HFiles to all regions, thus cannot call
119     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
120     try {
121       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
122       SecureBulkLoadProtos.SecureBulkLoadService instance =
123           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
124 
125       SecureBulkLoadProtos.DelegationToken protoDT =
126           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
127       if(userToken != null) {
128         protoDT =
129             SecureBulkLoadProtos.DelegationToken.newBuilder()
130               .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
131               .setPassword(ByteStringer.wrap(userToken.getPassword()))
132               .setKind(userToken.getKind().toString())
133               .setService(userToken.getService().toString()).build();
134       }
135 
136       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
137           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
138       for(Pair<byte[], String> el: familyPaths) {
139         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
140           .setFamily(ByteStringer.wrap(el.getFirst()))
141           .setPath(el.getSecond()).build());
142       }
143 
144       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
145           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
146             .setFsToken(protoDT)
147             .addAllFamilyPath(protoFamilyPaths)
148             .setBulkToken(bulkToken).build();
149 
150       ServerRpcController controller = new ServerRpcController();
151       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
152           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
153       instance.secureBulkLoadHFiles(controller,
154         request,
155         rpcCallback);
156 
157       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
158       if (controller.failedOnException()) {
159         throw controller.getFailedOn();
160       }
161       return response.getLoaded();
162     } catch (Throwable throwable) {
163       throw new IOException(throwable);
164     }
165   }
166 
167   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
168     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
169   }
170 }