View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
22  import static org.apache.hadoop.hbase.HConstants.LAST_ROW;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.client.Table;
33  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
34  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
35  import org.apache.hadoop.hbase.ipc.ServerRpcController;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
38  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
39  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
40  import org.apache.hadoop.hbase.util.ByteStringer;
41  import org.apache.hadoop.hbase.util.Pair;
42  import org.apache.hadoop.security.token.Token;
43  
44  /**
45   * Client proxy for SecureBulkLoadProtocol
46   * used in conjunction with SecureBulkLoadEndpoint
47   */
48  @InterfaceAudience.Private
49  public class SecureBulkLoadClient {
50    private Table table;
51  
52    public SecureBulkLoadClient(Table table) {
53      this.table = table;
54    }
55  
56    public String prepareBulkLoad(final TableName tableName) throws IOException {
57      try {
58        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
59        SecureBulkLoadProtos.SecureBulkLoadService instance =
60            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
61  
62        ServerRpcController controller = new ServerRpcController();
63  
64        BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
65            new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
66  
67        SecureBulkLoadProtos.PrepareBulkLoadRequest request =
68            SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
69            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
70  
71        instance.prepareBulkLoad(controller,
72            request,
73            rpcCallback);
74  
75        SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
76        if (controller.failedOnException()) {
77          throw controller.getFailedOn();
78        }
79        
80        return response.getBulkToken();
81      } catch (Throwable throwable) {
82        throw new IOException(throwable);
83      }
84    }
85  
86    public void cleanupBulkLoad(final String bulkToken) throws IOException {
87      try {
88        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
89        SecureBulkLoadProtos.SecureBulkLoadService instance =
90            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
91  
92        ServerRpcController controller = new ServerRpcController();
93  
94        BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
95            new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
96  
97        SecureBulkLoadProtos.CleanupBulkLoadRequest request =
98            SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
99                .setBulkToken(bulkToken).build();
100 
101       instance.cleanupBulkLoad(controller,
102           request,
103           rpcCallback);
104 
105       if (controller.failedOnException()) {
106         throw controller.getFailedOn();
107       }
108     } catch (Throwable throwable) {
109       throw new IOException(throwable);
110     }
111   }
112 
113   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
114                          final Token<?> userToken,
115                          final String bulkToken,
116                          final byte[] startRow) throws IOException {
117     // we never want to send a batch of HFiles to all regions, thus cannot call
118     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
119     try {
120       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
121       SecureBulkLoadProtos.SecureBulkLoadService instance =
122           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
123 
124       SecureBulkLoadProtos.DelegationToken protoDT =
125           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
126       if(userToken != null) {
127         protoDT =
128             SecureBulkLoadProtos.DelegationToken.newBuilder()
129               .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
130               .setPassword(ByteStringer.wrap(userToken.getPassword()))
131               .setKind(userToken.getKind().toString())
132               .setService(userToken.getService().toString()).build();
133       }
134 
135       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
136           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
137       for(Pair<byte[], String> el: familyPaths) {
138         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
139           .setFamily(ByteStringer.wrap(el.getFirst()))
140           .setPath(el.getSecond()).build());
141       }
142 
143       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
144           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
145             .setFsToken(protoDT)
146             .addAllFamilyPath(protoFamilyPaths)
147             .setBulkToken(bulkToken).build();
148 
149       ServerRpcController controller = new ServerRpcController();
150       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
151           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
152       instance.secureBulkLoadHFiles(controller,
153         request,
154         rpcCallback);
155 
156       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
157       if (controller.failedOnException()) {
158         throw controller.getFailedOn();
159       }
160       return response.getLoaded();
161     } catch (Throwable throwable) {
162       throw new IOException(throwable);
163     }
164   }
165 
166   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
167     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
168   }
169 }