View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.client.Table;
30  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
31  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
32  import org.apache.hadoop.hbase.ipc.ServerRpcController;
33  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
36  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
37  import org.apache.hadoop.hbase.util.ByteStringer;
38  import org.apache.hadoop.hbase.util.Pair;
39  import org.apache.hadoop.security.token.Token;
40  
41  /**
42   * Client proxy for SecureBulkLoadProtocol
43   * used in conjunction with SecureBulkLoadEndpoint
44   */
45  @InterfaceAudience.Private
46  public class SecureBulkLoadClient {
47    private Table table;
48  
49    public SecureBulkLoadClient(Table table) {
50      this.table = table;
51    }
52  
53    public String prepareBulkLoad(final TableName tableName) throws IOException {
54      try {
55        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
56        SecureBulkLoadProtos.SecureBulkLoadService instance =
57            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
58  
59        ServerRpcController controller = new ServerRpcController();
60  
61        BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
62            new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
63  
64        SecureBulkLoadProtos.PrepareBulkLoadRequest request =
65            SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
66            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
67  
68        instance.prepareBulkLoad(controller,
69            request,
70            rpcCallback);
71  
72        SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
73        if (controller.failedOnException()) {
74          throw controller.getFailedOn();
75        }
76        
77        return response.getBulkToken();
78      } catch (Throwable throwable) {
79        throw new IOException(throwable);
80      }
81    }
82  
83    public void cleanupBulkLoad(final String bulkToken) throws IOException {
84      try {
85        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
86        SecureBulkLoadProtos.SecureBulkLoadService instance =
87            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
88  
89        ServerRpcController controller = new ServerRpcController();
90  
91        BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
92            new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
93  
94        SecureBulkLoadProtos.CleanupBulkLoadRequest request =
95            SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
96                .setBulkToken(bulkToken).build();
97  
98        instance.cleanupBulkLoad(controller,
99            request,
100           rpcCallback);
101 
102       if (controller.failedOnException()) {
103         throw controller.getFailedOn();
104       }
105     } catch (Throwable throwable) {
106       throw new IOException(throwable);
107     }
108   }
109 
110   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
111                          final Token<?> userToken,
112                          final String bulkToken,
113                          final byte[] startRow) throws IOException {
114     // we never want to send a batch of HFiles to all regions, thus cannot call
115     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
116     try {
117       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
118       SecureBulkLoadProtos.SecureBulkLoadService instance =
119           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
120 
121       SecureBulkLoadProtos.DelegationToken protoDT =
122           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
123       if(userToken != null) {
124         protoDT =
125             SecureBulkLoadProtos.DelegationToken.newBuilder()
126               .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
127               .setPassword(ByteStringer.wrap(userToken.getPassword()))
128               .setKind(userToken.getKind().toString())
129               .setService(userToken.getService().toString()).build();
130       }
131 
132       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
133           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
134       for(Pair<byte[], String> el: familyPaths) {
135         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
136           .setFamily(ByteStringer.wrap(el.getFirst()))
137           .setPath(el.getSecond()).build());
138       }
139 
140       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
141           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
142             .setFsToken(protoDT)
143             .addAllFamilyPath(protoFamilyPaths)
144             .setBulkToken(bulkToken).build();
145 
146       ServerRpcController controller = new ServerRpcController();
147       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
148           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
149       instance.secureBulkLoadHFiles(controller,
150         request,
151         rpcCallback);
152 
153       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
154       if (controller.failedOnException()) {
155         throw controller.getFailedOn();
156       }
157       return response.getLoaded();
158     } catch (Throwable throwable) {
159       throw new IOException(throwable);
160     }
161   }
162 
163   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
164     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
165   }
166 }