1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.coprocessor;
20
21 import org.apache.hadoop.hbase.client.Table;
22 import org.apache.hadoop.hbase.util.ByteStringer;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
32 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
33 import org.apache.hadoop.hbase.ipc.ServerRpcController;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
36 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
37 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
38 import org.apache.hadoop.hbase.util.Pair;
39 import org.apache.hadoop.security.token.Token;
40
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.List;
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class SecureBulkLoadClient {
51 private Table table;
52
53 public SecureBulkLoadClient(Table table) {
54 this.table = table;
55 }
56
57 public String prepareBulkLoad(final TableName tableName) throws IOException {
58 try {
59 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
60 SecureBulkLoadProtos.SecureBulkLoadService instance =
61 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
62
63 ServerRpcController controller = new ServerRpcController();
64
65 BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
66 new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
67
68 SecureBulkLoadProtos.PrepareBulkLoadRequest request =
69 SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
70 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
71
72 instance.prepareBulkLoad(controller,
73 request,
74 rpcCallback);
75
76 SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
77 if (controller.failedOnException()) {
78 throw controller.getFailedOn();
79 }
80
81 return response.getBulkToken();
82 } catch (Throwable throwable) {
83 throw new IOException(throwable);
84 }
85 }
86
87 public void cleanupBulkLoad(final String bulkToken) throws IOException {
88 try {
89 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
90 SecureBulkLoadProtos.SecureBulkLoadService instance =
91 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
92
93 ServerRpcController controller = new ServerRpcController();
94
95 BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
96 new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
97
98 SecureBulkLoadProtos.CleanupBulkLoadRequest request =
99 SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
100 .setBulkToken(bulkToken).build();
101
102 instance.cleanupBulkLoad(controller,
103 request,
104 rpcCallback);
105
106 if (controller.failedOnException()) {
107 throw controller.getFailedOn();
108 }
109 } catch (Throwable throwable) {
110 throw new IOException(throwable);
111 }
112 }
113
114 public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
115 final Token<?> userToken,
116 final String bulkToken,
117 final byte[] startRow) throws IOException {
118
119
120 try {
121 CoprocessorRpcChannel channel = table.coprocessorService(startRow);
122 SecureBulkLoadProtos.SecureBulkLoadService instance =
123 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
124
125 SecureBulkLoadProtos.DelegationToken protoDT =
126 SecureBulkLoadProtos.DelegationToken.newBuilder().build();
127 if(userToken != null) {
128 protoDT =
129 SecureBulkLoadProtos.DelegationToken.newBuilder()
130 .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
131 .setPassword(ByteStringer.wrap(userToken.getPassword()))
132 .setKind(userToken.getKind().toString())
133 .setService(userToken.getService().toString()).build();
134 }
135
136 List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
137 new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
138 for(Pair<byte[], String> el: familyPaths) {
139 protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
140 .setFamily(ByteStringer.wrap(el.getFirst()))
141 .setPath(el.getSecond()).build());
142 }
143
144 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
145 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
146 .setFsToken(protoDT)
147 .addAllFamilyPath(protoFamilyPaths)
148 .setBulkToken(bulkToken).build();
149
150 ServerRpcController controller = new ServerRpcController();
151 BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
152 new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
153 instance.secureBulkLoadHFiles(controller,
154 request,
155 rpcCallback);
156
157 SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
158 if (controller.failedOnException()) {
159 throw controller.getFailedOn();
160 }
161 return response.getLoaded();
162 } catch (Throwable throwable) {
163 throw new IOException(throwable);
164 }
165 }
166
167 public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
168 return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
169 }
170 }