1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.access;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.protobuf.RpcCallback;
23 import com.google.protobuf.RpcController;
24 import com.google.protobuf.Service;
25
26 import org.apache.commons.lang.StringUtils;
27 import org.apache.commons.lang.mutable.MutableInt;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.permission.FsPermission;
37 import org.apache.hadoop.hbase.Coprocessor;
38 import org.apache.hadoop.hbase.CoprocessorEnvironment;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
42 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
43 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
45 import org.apache.hadoop.hbase.ipc.RpcServer;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
49 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
50 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
51 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
52 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
53 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
54 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
55 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
56 import org.apache.hadoop.hbase.regionserver.Region;
57 import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
58 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
59 import org.apache.hadoop.hbase.security.User;
60 import org.apache.hadoop.hbase.security.UserProvider;
61 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.FSHDFSUtils;
64 import org.apache.hadoop.hbase.util.Methods;
65 import org.apache.hadoop.hbase.util.Pair;
66 import org.apache.hadoop.io.Text;
67 import org.apache.hadoop.security.UserGroupInformation;
68 import org.apache.hadoop.security.token.Token;
69
70 import java.io.IOException;
71 import java.math.BigInteger;
72 import java.security.PrivilegedAction;
73 import java.security.SecureRandom;
74 import java.util.ArrayList;
75 import java.util.Arrays;
76 import java.util.HashMap;
77 import java.util.HashSet;
78 import java.util.List;
79 import java.util.Map;
80 import java.util.Set;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @InterfaceAudience.Private
109 public class SecureBulkLoadEndpoint extends SecureBulkLoadService
110 implements CoprocessorService, Coprocessor {
111
112 public static final long VERSION = 0L;
113
114
115 private static final int RANDOM_WIDTH = 320;
116 private static final int RANDOM_RADIX = 32;
117
118 private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
119
120 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
121 private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
122
123 public static final String FS_WITHOUT_SUPPORT_PERMISSION_KEY =
124 "hbase.secure.bulkload.fs.permission.lacking";
125 public static final String FS_WITHOUT_SUPPORT_PERMISSION_DEFAULT =
126 "s3,s3a,s3n,wasb,wasbs,swift,adfs,abfs,viewfs";
127
128 private SecureRandom random;
129 private FileSystem fs;
130 private Configuration conf;
131
132
133
134 private Path baseStagingDir;
135
136 private RegionCoprocessorEnvironment env;
137
138 private UserProvider userProvider;
139 private static HashMap<UserGroupInformation, MutableInt> ugiReferenceCounter = new HashMap<>();
140
141 @Override
142 public void start(CoprocessorEnvironment env) {
143 this.env = (RegionCoprocessorEnvironment)env;
144 random = new SecureRandom();
145 conf = env.getConfiguration();
146 baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
147 this.userProvider = UserProvider.instantiate(conf);
148 Set<String> fsSet = getFileSystemSchemesWithoutPermissionSupport(conf);
149
150 try {
151 fs = baseStagingDir.getFileSystem(conf);
152 if (!fs.exists(baseStagingDir)) {
153 fs.mkdirs(baseStagingDir, PERM_HIDDEN);
154 }
155 FileStatus status = fs.getFileStatus(baseStagingDir);
156 if (status == null) {
157 throw new IllegalStateException("Failed to create staging directory");
158 }
159
160
161
162 if (!status.getPermission().equals(PERM_HIDDEN)) {
163 fs.setPermission(baseStagingDir, PERM_HIDDEN);
164 status = fs.getFileStatus(baseStagingDir);
165 }
166
167
168 Path doNotEraseDir = new Path(baseStagingDir, "DONOTERASE");
169 if (!fs.exists(doNotEraseDir)) {
170 fs.mkdirs(doNotEraseDir, PERM_HIDDEN);
171 fs.setPermission(doNotEraseDir, PERM_HIDDEN);
172 }
173
174 String scheme = fs.getScheme().toLowerCase();
175 if (!fsSet.contains(scheme) && !status.getPermission().equals(PERM_HIDDEN)) {
176 throw new IllegalStateException(
177 "Staging directory of " + baseStagingDir + " already exists but permissions aren't set to '-rwx--x--x' ");
178 }
179 } catch (IOException e) {
180 throw new IllegalStateException("Failed to get FileSystem instance",e);
181 }
182 }
183
184 Set<String> getFileSystemSchemesWithoutPermissionSupport(Configuration conf) {
185 final String value = conf.get(
186 FS_WITHOUT_SUPPORT_PERMISSION_KEY, FS_WITHOUT_SUPPORT_PERMISSION_DEFAULT);
187 return new HashSet<String>(Arrays.asList(StringUtils.split(value, ',')));
188 }
189
190 @Override
191 public void stop(CoprocessorEnvironment env) throws IOException {
192 }
193
194 @Override
195 public void prepareBulkLoad(RpcController controller,
196 PrepareBulkLoadRequest request,
197 RpcCallback<PrepareBulkLoadResponse> done){
198 try {
199 List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
200
201 if(bulkLoadObservers != null) {
202 ObserverContext<RegionCoprocessorEnvironment> ctx =
203 new ObserverContext<RegionCoprocessorEnvironment>();
204 ctx.prepare(env);
205
206 for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
207 bulkLoadObserver.prePrepareBulkLoad(ctx, request);
208 }
209 }
210
211 String bulkToken = createStagingDir(baseStagingDir,
212 getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
213 done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
214 } catch (IOException e) {
215 ResponseConverter.setControllerException(controller, e);
216 }
217 done.run(null);
218 }
219
220 @Override
221 public void cleanupBulkLoad(RpcController controller,
222 CleanupBulkLoadRequest request,
223 RpcCallback<CleanupBulkLoadResponse> done) {
224 try {
225 List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
226
227 if(bulkLoadObservers != null) {
228 ObserverContext<RegionCoprocessorEnvironment> ctx =
229 new ObserverContext<RegionCoprocessorEnvironment>();
230 ctx.prepare(env);
231
232 for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
233 bulkLoadObserver.preCleanupBulkLoad(ctx, request);
234 }
235 }
236
237 fs.delete(new Path(request.getBulkToken()), true);
238 done.run(CleanupBulkLoadResponse.newBuilder().build());
239 } catch (IOException e) {
240 ResponseConverter.setControllerException(controller, e);
241 } finally {
242 UserGroupInformation ugi = getActiveUser().getUGI();
243 try {
244 if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
245 FileSystem.closeAllForUGI(ugi);
246 }
247 } catch (IOException e) {
248 LOG.error("Failed to close FileSystem for: " + ugi, e);
249 }
250 }
251 done.run(null);
252 }
253
254 @VisibleForTesting
255 interface Consumer<T> {
256 void accept(T t);
257 }
258 private static Consumer<Region> fsCreatedListener;
259
260 @VisibleForTesting
261 static void setFsCreatedListener(Consumer<Region> listener) {
262 fsCreatedListener = listener;
263 }
264
265
266 private void incrementUgiReference(UserGroupInformation ugi) {
267 synchronized (ugiReferenceCounter) {
268 final MutableInt counter = ugiReferenceCounter.get(ugi);
269 if (counter == null) {
270 ugiReferenceCounter.put(ugi, new MutableInt(1));
271 } else {
272 counter.increment();
273 }
274 }
275 }
276
277 private void decrementUgiReference(UserGroupInformation ugi) {
278 synchronized (ugiReferenceCounter) {
279 final MutableInt counter = ugiReferenceCounter.get(ugi);
280 if(counter == null || counter.intValue() <= 1) {
281 ugiReferenceCounter.remove(ugi);
282 } else {
283 counter.decrement();
284 }
285 }
286 }
287
288 private boolean isUserReferenced(UserGroupInformation ugi) {
289 synchronized (ugiReferenceCounter) {
290 final MutableInt counter = ugiReferenceCounter.get(ugi);
291 return counter != null && counter.intValue() > 0;
292 }
293 }
294
295 @Override
296 public void secureBulkLoadHFiles(RpcController controller,
297 SecureBulkLoadHFilesRequest request,
298 RpcCallback<SecureBulkLoadHFilesResponse> done) {
299 final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
300 for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
301 familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
302 }
303
304 Token userToken = null;
305 if (userProvider.isHadoopSecurityEnabled()) {
306 userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
307 .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
308 request.getFsToken().getService()));
309 }
310 final String bulkToken = request.getBulkToken();
311 User user = getActiveUser();
312 final UserGroupInformation ugi = user.getUGI();
313 if(userToken != null) {
314 ugi.addToken(userToken);
315 } else if (userProvider.isHadoopSecurityEnabled()) {
316
317
318 ResponseConverter.setControllerException(controller,
319 new DoNotRetryIOException("User token cannot be null"));
320 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
321 return;
322 }
323
324 Region region = env.getRegion();
325 boolean bypass = false;
326 if (region.getCoprocessorHost() != null) {
327 try {
328 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
329 } catch (IOException e) {
330 ResponseConverter.setControllerException(controller, e);
331 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
332 return;
333 }
334 }
335 boolean loaded = false;
336 if (!bypass) {
337
338
339
340
341
342 if (userProvider.isHadoopSecurityEnabled()) {
343 FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
344 try {
345 targetfsDelegationToken.acquireDelegationToken(fs);
346 } catch (IOException e) {
347 ResponseConverter.setControllerException(controller, e);
348 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
349 return;
350 }
351 Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
352 if (targetFsToken != null
353 && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
354 ugi.addToken(targetFsToken);
355 }
356 }
357
358 incrementUgiReference(ugi);
359 loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
360 @Override
361 public Boolean run() {
362 FileSystem fs = null;
363 try {
364 Configuration conf = env.getConfiguration();
365 fs = FileSystem.get(conf);
366 for(Pair<byte[], String> el: familyPaths) {
367 Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
368 if(!fs.exists(stageFamily)) {
369 fs.mkdirs(stageFamily);
370 fs.setPermission(stageFamily, PERM_ALL_ACCESS);
371 }
372 }
373 if (fsCreatedListener != null) {
374 fsCreatedListener.accept(env.getRegion());
375 }
376
377
378 return env.getRegion().bulkLoadHFiles(familyPaths, true,
379 new SecureBulkLoadListener(fs, bulkToken, conf));
380 } catch (Exception e) {
381 LOG.error("Failed to complete bulk load", e);
382 }
383 return false;
384 }
385 });
386 decrementUgiReference(ugi);
387 }
388 if (region.getCoprocessorHost() != null) {
389 try {
390 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
391 } catch (IOException e) {
392 ResponseConverter.setControllerException(controller, e);
393 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
394 return;
395 }
396 }
397 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
398 }
399
400 private List<BulkLoadObserver> getBulkLoadObservers() {
401 List<BulkLoadObserver> coprocessorList =
402 this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
403
404 return coprocessorList;
405 }
406
407 private Path createStagingDir(Path baseDir,
408 User user,
409 TableName tableName) throws IOException {
410 String tblName = tableName.getNameAsString().replace(":", "_");
411 String randomDir = user.getShortName()+"__"+ tblName +"__"+
412 (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
413 return createStagingDir(baseDir, user, randomDir);
414 }
415
416 private Path createStagingDir(Path baseDir,
417 User user,
418 String randomDir) throws IOException {
419 Path p = new Path(baseDir, randomDir);
420 fs.mkdirs(p, PERM_ALL_ACCESS);
421 fs.setPermission(p, PERM_ALL_ACCESS);
422 return p;
423 }
424
425 private User getActiveUser() {
426 User user = RpcServer.getRequestUser();
427 if (user == null) {
428 return null;
429 }
430
431
432 if (userProvider.isHadoopSecurityEnabled()
433 && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
434 return User.createUserForTesting(conf, user.getShortName(), new String[]{});
435 }
436
437 return user;
438 }
439
440 @Override
441 public Service getService() {
442 return this;
443 }
444
445 private static class SecureBulkLoadListener implements BulkLoadListener {
446
447 private FileSystem fs;
448 private String stagingDir;
449 private Configuration conf;
450
451 private FileSystem srcFs = null;
452 private Map<String, FsPermission> origPermissions = null;
453
454 public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
455 this.fs = fs;
456 this.stagingDir = stagingDir;
457 this.conf = conf;
458 this.origPermissions = new HashMap<String, FsPermission>();
459 }
460
461 @Override
462 public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
463 Path p = new Path(srcPath);
464 Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
465 if (srcFs == null) {
466 srcFs = FileSystem.newInstance(p.toUri(), conf);
467 }
468
469 if(!isFile(p)) {
470 throw new IOException("Path does not reference a file: " + p);
471 }
472
473
474 if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
475 LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
476 "the destination filesystem. Copying file over to destination staging dir.");
477 FileUtil.copy(srcFs, p, fs, stageP, false, conf);
478 } else {
479 LOG.debug("Moving " + p + " to " + stageP);
480 FileStatus origFileStatus = fs.getFileStatus(p);
481 origPermissions.put(srcPath, origFileStatus.getPermission());
482 if(!fs.rename(p, stageP)) {
483 throw new IOException("Failed to move HFile: " + p + " to " + stageP);
484 }
485 }
486 fs.setPermission(stageP, PERM_ALL_ACCESS);
487 return stageP.toString();
488 }
489
490 @Override
491 public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
492 LOG.debug("Bulk Load done for: " + srcPath);
493 closeSrcFs();
494 }
495
496 private void closeSrcFs() throws IOException {
497 if (srcFs != null) {
498 srcFs.close();
499 srcFs = null;
500 }
501 }
502
503 @Override
504 public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
505 try {
506 Path p = new Path(srcPath);
507 if (srcFs == null) {
508 srcFs = FileSystem.newInstance(p.toUri(), conf);
509 }
510 if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
511
512 return;
513 }
514 Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
515 LOG.debug("Moving " + stageP + " back to " + p);
516 if (!fs.rename(stageP, p))
517 throw new IOException("Failed to move HFile: " + stageP + " to " + p);
518
519
520 if (origPermissions.containsKey(srcPath)) {
521 fs.setPermission(p, origPermissions.get(srcPath));
522 } else {
523 LOG.warn("Can't find previous permission for path=" + srcPath);
524 }
525 } finally {
526 closeSrcFs();
527 }
528 }
529
530
531
532
533
534
535
536
537 private boolean isFile(Path p) throws IOException {
538 FileStatus status = srcFs.getFileStatus(p);
539 boolean isFile = !status.isDirectory();
540 try {
541 isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
542 } catch (Exception e) {
543 }
544 return isFile;
545 }
546 }
547 }