View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import org.apache.hadoop.hbase.client.Table;
22  import org.apache.hadoop.hbase.util.ByteStringer;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
32  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
33  import org.apache.hadoop.hbase.ipc.ServerRpcController;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
36  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
37  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
38  import org.apache.hadoop.hbase.util.Pair;
39  import org.apache.hadoop.security.token.Token;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.List;
44  
45  /**
46   * Client proxy for SecureBulkLoadProtocol
47   * used in conjunction with SecureBulkLoadEndpoint
48   */
49  @InterfaceAudience.Private
50  public class SecureBulkLoadClient {
51    private Table table;
52  
53    public SecureBulkLoadClient(Table table) {
54      this.table = table;
55    }
56  
57    public String prepareBulkLoad(final TableName tableName) throws IOException {
58      try {
59        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
60        SecureBulkLoadProtos.SecureBulkLoadService instance =
61            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
62  
63        ServerRpcController controller = new ServerRpcController();
64  
65        BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
66            new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
67  
68        SecureBulkLoadProtos.PrepareBulkLoadRequest request =
69            SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
70            .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
71  
72        instance.prepareBulkLoad(controller,
73            request,
74            rpcCallback);
75  
76        SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
77        if (controller.failedOnException()) {
78          throw controller.getFailedOn();
79        }
80  
81        return response.getBulkToken();
82      } catch (Throwable throwable) {
83        if (throwable instanceof IOException) {
84          throw (IOException) throwable;
85        }
86        throw new IOException(throwable);
87      }
88    }
89  
90    public void cleanupBulkLoad(final String bulkToken) throws IOException {
91      try {
92        CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
93        SecureBulkLoadProtos.SecureBulkLoadService instance =
94            ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
95  
96        ServerRpcController controller = new ServerRpcController();
97  
98        BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
99            new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
100 
101       SecureBulkLoadProtos.CleanupBulkLoadRequest request =
102           SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
103               .setBulkToken(bulkToken).build();
104 
105       instance.cleanupBulkLoad(controller,
106           request,
107           rpcCallback);
108 
109       if (controller.failedOnException()) {
110         throw controller.getFailedOn();
111       }
112     } catch (Throwable throwable) {
113       if (throwable instanceof IOException) {
114         throw (IOException) throwable;
115       }
116       throw new IOException(throwable);
117     }
118   }
119 
120   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
121                          final Token<?> userToken,
122                          final String bulkToken,
123                          final byte[] startRow) throws IOException {
124     // we never want to send a batch of HFiles to all regions, thus cannot call
125     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
126     try {
127       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
128       SecureBulkLoadProtos.SecureBulkLoadService instance =
129           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
130 
131       SecureBulkLoadProtos.DelegationToken protoDT =
132           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
133       if(userToken != null) {
134         protoDT =
135             SecureBulkLoadProtos.DelegationToken.newBuilder()
136               .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
137               .setPassword(ByteStringer.wrap(userToken.getPassword()))
138               .setKind(userToken.getKind().toString())
139               .setService(userToken.getService().toString()).build();
140       }
141 
142       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
143           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
144       for(Pair<byte[], String> el: familyPaths) {
145         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
146           .setFamily(ByteStringer.wrap(el.getFirst()))
147           .setPath(el.getSecond()).build());
148       }
149 
150       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
151           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
152             .setFsToken(protoDT)
153             .addAllFamilyPath(protoFamilyPaths)
154             .setBulkToken(bulkToken).build();
155 
156       ServerRpcController controller = new ServerRpcController();
157       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
158           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
159       instance.secureBulkLoadHFiles(controller,
160         request,
161         rpcCallback);
162 
163       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
164       if (controller.failedOnException()) {
165         throw controller.getFailedOn();
166       }
167       return response.getLoaded();
168     } catch (Throwable throwable) {
169       if (throwable instanceof IOException) {
170         throw (IOException) throwable;
171       }
172       throw new IOException(throwable);
173     }
174   }
175 
176   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
177     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
178   }
179 }