1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.coprocessor;
20
21 import org.apache.hadoop.hbase.client.Table;
22 import org.apache.hadoop.hbase.util.ByteStringer;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
32 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
33 import org.apache.hadoop.hbase.ipc.ServerRpcController;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
36 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
37 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
38 import org.apache.hadoop.hbase.util.Pair;
39 import org.apache.hadoop.security.token.Token;
40
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.List;
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class SecureBulkLoadClient {
51 private Table table;
52
53 public SecureBulkLoadClient(Table table) {
54 this.table = table;
55 }
56
57 public String prepareBulkLoad(final TableName tableName) throws IOException {
58 try {
59 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
60 SecureBulkLoadProtos.SecureBulkLoadService instance =
61 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
62
63 ServerRpcController controller = new ServerRpcController();
64
65 BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
66 new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
67
68 SecureBulkLoadProtos.PrepareBulkLoadRequest request =
69 SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
70 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
71
72 instance.prepareBulkLoad(controller,
73 request,
74 rpcCallback);
75
76 SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
77 if (controller.failedOnException()) {
78 throw controller.getFailedOn();
79 }
80
81 return response.getBulkToken();
82 } catch (Throwable throwable) {
83 if (throwable instanceof IOException) {
84 throw (IOException) throwable;
85 }
86 throw new IOException(throwable);
87 }
88 }
89
90 public void cleanupBulkLoad(final String bulkToken) throws IOException {
91 try {
92 CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
93 SecureBulkLoadProtos.SecureBulkLoadService instance =
94 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
95
96 ServerRpcController controller = new ServerRpcController();
97
98 BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
99 new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
100
101 SecureBulkLoadProtos.CleanupBulkLoadRequest request =
102 SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
103 .setBulkToken(bulkToken).build();
104
105 instance.cleanupBulkLoad(controller,
106 request,
107 rpcCallback);
108
109 if (controller.failedOnException()) {
110 throw controller.getFailedOn();
111 }
112 } catch (Throwable throwable) {
113 if (throwable instanceof IOException) {
114 throw (IOException) throwable;
115 }
116 throw new IOException(throwable);
117 }
118 }
119
120 public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
121 final Token<?> userToken,
122 final String bulkToken,
123 final byte[] startRow) throws IOException {
124
125
126 try {
127 CoprocessorRpcChannel channel = table.coprocessorService(startRow);
128 SecureBulkLoadProtos.SecureBulkLoadService instance =
129 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
130
131 SecureBulkLoadProtos.DelegationToken protoDT =
132 SecureBulkLoadProtos.DelegationToken.newBuilder().build();
133 if(userToken != null) {
134 protoDT =
135 SecureBulkLoadProtos.DelegationToken.newBuilder()
136 .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
137 .setPassword(ByteStringer.wrap(userToken.getPassword()))
138 .setKind(userToken.getKind().toString())
139 .setService(userToken.getService().toString()).build();
140 }
141
142 List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
143 new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
144 for(Pair<byte[], String> el: familyPaths) {
145 protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
146 .setFamily(ByteStringer.wrap(el.getFirst()))
147 .setPath(el.getSecond()).build());
148 }
149
150 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
151 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
152 .setFsToken(protoDT)
153 .addAllFamilyPath(protoFamilyPaths)
154 .setBulkToken(bulkToken).build();
155
156 ServerRpcController controller = new ServerRpcController();
157 BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
158 new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
159 instance.secureBulkLoadHFiles(controller,
160 request,
161 rpcCallback);
162
163 SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
164 if (controller.failedOnException()) {
165 throw controller.getFailedOn();
166 }
167 return response.getLoaded();
168 } catch (Throwable throwable) {
169 if (throwable instanceof IOException) {
170 throw (IOException) throwable;
171 }
172 throw new IOException(throwable);
173 }
174 }
175
176 public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
177 return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
178 }
179 }