001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.math.BigInteger;
022import java.security.PrivilegedAction;
023import java.security.SecureRandom;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.function.Consumer;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.commons.lang3.mutable.MutableInt;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.FileUtil;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.permission.FsPermission;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.AsyncConnection;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.regionserver.HRegion.BulkLoadListener;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.security.UserProvider;
046import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
047import org.apache.hadoop.hbase.security.token.ClientTokenUtil;
048import org.apache.hadoop.hbase.security.token.FsDelegationToken;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.CommonFSUtils;
051import org.apache.hadoop.hbase.util.FSUtils;
052import org.apache.hadoop.hbase.util.Methods;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.io.Text;
055import org.apache.hadoop.security.UserGroupInformation;
056import org.apache.hadoop.security.token.Token;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
065
066/**
067 * Bulk loads in secure mode. This service addresses two issues:
068 * <ol>
069 * <li>Moving files in a secure filesystem wherein the HBase Client and HBase Server are different
070 * filesystem users.</li>
071 * <li>Does moving in a secure manner. Assuming that the filesystem is POSIX compliant.</li>
072 * </ol>
073 * The algorithm is as follows:
074 * <ol>
075 * <li>Create an hbase owned staging directory which is world traversable (711):
076 * {@code /hbase/staging}</li>
077 * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
078 * <li>A call is made to hbase to create a secret staging directory which globally rwx (777):
079 * {@code /user/staging/averylongandrandomdirectoryname}</li>
080 * <li>The user moves the data into the random staging directory, then calls bulkLoadHFiles()</li>
081 * </ol>
082 * Like delegation tokens the strength of the security lies in the length and randomness of the
083 * secret directory.
084 */
085@InterfaceAudience.Private
086public class SecureBulkLoadManager {
087
088  public static final long VERSION = 0L;
089
090  // 320/5 = 64 characters
091  private static final int RANDOM_WIDTH = 320;
092  private static final int RANDOM_RADIX = 32;
093
094  private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadManager.class);
095
096  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
097  private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
098  private SecureRandom random;
099  private FileSystem fs;
100  private Configuration conf;
101
102  // two levels so it doesn't get deleted accidentally
103  // no sticky bit in Hadoop 1.0
104  private Path baseStagingDir;
105
106  private UserProvider userProvider;
107  private ConcurrentHashMap<UserGroupInformation, MutableInt> ugiReferenceCounter;
108  private AsyncConnection conn;
109
110  SecureBulkLoadManager(Configuration conf, AsyncConnection conn) {
111    this.conf = conf;
112    this.conn = conn;
113  }
114
115  public void start() throws IOException {
116    random = new SecureRandom();
117    userProvider = UserProvider.instantiate(conf);
118    ugiReferenceCounter = new ConcurrentHashMap<>();
119    fs = FileSystem.get(conf);
120    baseStagingDir = new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
121
122    if (conf.get("hbase.bulkload.staging.dir") != null) {
123      LOG.warn("hbase.bulkload.staging.dir " + " is deprecated. Bulkload staging directory is "
124        + baseStagingDir);
125    }
126    if (!fs.exists(baseStagingDir)) {
127      fs.mkdirs(baseStagingDir, PERM_HIDDEN);
128      if (!PERM_HIDDEN.equals(PERM_HIDDEN.applyUMask(FsPermission.getUMask(conf)))) {
129        LOG.info("Modifying permissions to " + PERM_HIDDEN);
130        fs.setPermission(baseStagingDir, PERM_HIDDEN);
131      }
132    }
133  }
134
135  public void stop() throws IOException {
136  }
137
138  public String prepareBulkLoad(final HRegion region, final PrepareBulkLoadRequest request)
139    throws IOException {
140    User user = getActiveUser();
141    region.getCoprocessorHost().prePrepareBulkLoad(user);
142
143    String bulkToken =
144      createStagingDir(baseStagingDir, user, region.getTableDescriptor().getTableName()).toString();
145
146    return bulkToken;
147  }
148
149  public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
150    throws IOException {
151    region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
152
153    Path path = new Path(request.getBulkToken());
154    validateStagingPath(path);
155    if (!fs.delete(path, true)) {
156      if (fs.exists(path)) {
157        throw new IOException("Failed to clean up " + path);
158      }
159    }
160    LOG.trace("Cleaned up {} successfully.", path);
161  }
162
163  /**
164   * Verify that the given path is a direct child of the staging directory. Rejects path traversal
165   * attempts and paths outside the expected staging area.
166   */
167  void validateStagingPath(Path path) throws IOException {
168    Path qualified = path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
169    Path qualifiedBase = baseStagingDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
170    if (qualified.getParent() == null || !qualified.getParent().equals(qualifiedBase)) {
171      throw new DoNotRetryIOException(
172        "Bulk load token path must be a direct child of the staging directory: " + baseStagingDir);
173    }
174  }
175
176  private Consumer<HRegion> fsCreatedListener;
177
178  void setFsCreatedListener(Consumer<HRegion> fsCreatedListener) {
179    this.fsCreatedListener = fsCreatedListener;
180  }
181
182  private void incrementUgiReference(UserGroupInformation ugi) {
183    // if we haven't seen this ugi before, make a new counter
184    ugiReferenceCounter.compute(ugi, (key, value) -> {
185      if (value == null) {
186        value = new MutableInt(1);
187      } else {
188        value.increment();
189      }
190      return value;
191    });
192  }
193
194  private void decrementUgiReference(UserGroupInformation ugi) {
195    // if the count drops below 1 we remove the entry by returning null
196    ugiReferenceCounter.computeIfPresent(ugi, (key, value) -> {
197      if (value.intValue() > 1) {
198        value.decrement();
199      } else {
200        value = null;
201      }
202      return value;
203    });
204  }
205
206  private boolean isUserReferenced(UserGroupInformation ugi) {
207    // if the ugi is in the map, based on invariants above
208    // the count must be above zero
209    return ugiReferenceCounter.containsKey(ugi);
210  }
211
212  public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
213    final BulkLoadHFileRequest request) throws IOException {
214    return secureBulkLoadHFiles(region, request, null);
215  }
216
217  public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
218    final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
219    final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
220    for (ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
221      familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
222    }
223
224    Token<AuthenticationTokenIdentifier> userToken = null;
225    if (userProvider.isHadoopSecurityEnabled()) {
226      userToken = new Token<>(request.getFsToken().getIdentifier().toByteArray(),
227        request.getFsToken().getPassword().toByteArray(), new Text(request.getFsToken().getKind()),
228        new Text(request.getFsToken().getService()));
229    }
230    final String bulkToken = request.getBulkToken();
231    User user = getActiveUser();
232    final UserGroupInformation ugi = user.getUGI();
233    if (userProvider.isHadoopSecurityEnabled()) {
234      try {
235        Token<AuthenticationTokenIdentifier> tok = ClientTokenUtil.obtainToken(conn).get();
236        if (tok != null) {
237          boolean b = ugi.addToken(tok);
238          LOG.debug("token added " + tok + " for user " + ugi + " return=" + b);
239        }
240      } catch (Exception ioe) {
241        LOG.warn("unable to add token", ioe);
242      }
243    }
244    if (userToken != null) {
245      ugi.addToken(userToken);
246    } else if (userProvider.isHadoopSecurityEnabled()) {
247      // we allow this to pass through in "simple" security mode
248      // for mini cluster testing
249      throw new DoNotRetryIOException("User token cannot be null");
250    }
251
252    if (region.getCoprocessorHost() != null) {
253      region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
254    }
255    Map<byte[], List<Path>> map = null;
256
257    try {
258      incrementUgiReference(ugi);
259      // Get the target fs (HBase region server fs) delegation token
260      // Since we have checked the permission via 'preBulkLoadHFile', now let's give
261      // the 'request user' necessary token to operate on the target fs.
262      // After this point the 'doAs' user will hold two tokens, one for the source fs
263      // ('request user'), another for the target fs (HBase region server principal).
264      if (userProvider.isHadoopSecurityEnabled()) {
265        FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
266        targetfsDelegationToken.acquireDelegationToken(fs);
267
268        Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
269        if (
270          targetFsToken != null
271            && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))
272        ) {
273          ugi.addToken(targetFsToken);
274        }
275      }
276
277      map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
278        @Override
279        public Map<byte[], List<Path>> run() {
280          FileSystem fs = null;
281          try {
282            /*
283             * This is creating and caching a new FileSystem instance. Other code called "beneath"
284             * this method will rely on this FileSystem instance being in the cache. This is
285             * important as those methods make _no_ attempt to close this FileSystem instance. It is
286             * critical that here, in SecureBulkLoadManager, we are tracking the lifecycle and
287             * closing the FS when safe to do so.
288             */
289            fs = FileSystem.get(conf);
290            for (Pair<byte[], String> el : familyPaths) {
291              Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
292              if (!fs.exists(stageFamily)) {
293                fs.mkdirs(stageFamily);
294                fs.setPermission(stageFamily, PERM_ALL_ACCESS);
295              }
296            }
297            if (fsCreatedListener != null) {
298              fsCreatedListener.accept(region);
299            }
300            // We call bulkLoadHFiles as requesting user
301            // To enable access prior to staging
302            return region.bulkLoadHFiles(familyPaths, true,
303              new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), clusterIds,
304              request.getReplicate());
305          } catch (Exception e) {
306            LOG.error("Failed to complete bulk load", e);
307          }
308          return null;
309        }
310      });
311    } finally {
312      decrementUgiReference(ugi);
313      try {
314        if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
315          FileSystem.closeAllForUGI(ugi);
316        }
317      } catch (IOException e) {
318        LOG.error("Failed to close FileSystem for: {}", ugi, e);
319      }
320      if (region.getCoprocessorHost() != null) {
321        region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
322      }
323    }
324    return map;
325  }
326
327  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
328    String tblName = tableName.getNameAsString().replace(":", "_");
329    String randomDir = user.getShortName() + "__" + tblName + "__"
330      + (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
331    return createStagingDir(baseDir, user, randomDir);
332  }
333
334  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
335    Path p = new Path(baseDir, randomDir);
336    fs.mkdirs(p, PERM_ALL_ACCESS);
337    fs.setPermission(p, PERM_ALL_ACCESS);
338    return p;
339  }
340
341  private User getActiveUser() throws IOException {
342    // for non-rpc handling, fallback to system user
343    User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
344    // this is for testing
345    if (
346      userProvider.isHadoopSecurityEnabled()
347        && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))
348    ) {
349      return User.createUserForTesting(conf, user.getShortName(), new String[] {});
350    }
351
352    return user;
353  }
354
355  // package-private for test purpose only
356  static class SecureBulkLoadListener implements BulkLoadListener {
357    // Target filesystem
358    private final FileSystem fs;
359    private final String stagingDir;
360    private final Configuration conf;
361    // Source filesystem
362    private FileSystem srcFs = null;
363    private Map<String, FsPermission> origPermissions = null;
364    private Map<String, String> origSources = null;
365
366    public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
367      this.fs = fs;
368      this.stagingDir = stagingDir;
369      this.conf = conf;
370      this.origPermissions = new HashMap<>();
371      this.origSources = new HashMap<>();
372    }
373
374    @Override
375    public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile,
376      String customStaging) throws IOException {
377      Path p = new Path(srcPath);
378
379      // store customStaging for failedBulkLoad
380      String currentStaging = stagingDir;
381      if (StringUtils.isNotEmpty(customStaging)) {
382        currentStaging = customStaging;
383      }
384
385      Path stageP = new Path(currentStaging, new Path(Bytes.toString(family), p.getName()));
386
387      // In case of Replication for bulk load files, hfiles are already copied in staging directory
388      if (p.equals(stageP)) {
389        LOG.debug(
390          p.getName() + " is already available in staging directory. Skipping copy or rename.");
391        return stageP.toString();
392      }
393
394      if (srcFs == null) {
395        srcFs = FileSystem.newInstance(p.toUri(), conf);
396      }
397
398      if (!isFile(p)) {
399        throw new IOException("Path does not reference a file: " + p);
400      }
401
402      // Check to see if the source and target filesystems are the same
403      if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
404        LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than "
405          + "the destination filesystem. Copying file over to destination staging dir.");
406        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
407      } else if (copyFile) {
408        LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
409        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
410      } else {
411        LOG.debug("Moving " + p + " to " + stageP);
412        FileStatus origFileStatus = fs.getFileStatus(p);
413        origPermissions.put(srcPath, origFileStatus.getPermission());
414        origSources.put(stageP.toString(), srcPath);
415        if (!fs.rename(p, stageP)) {
416          throw new IOException("Failed to move HFile: " + p + " to " + stageP);
417        }
418      }
419      fs.setPermission(stageP, PERM_ALL_ACCESS);
420
421      return stageP.toString();
422    }
423
424    @Override
425    public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
426      LOG.debug("Bulk Load done for: " + srcPath);
427      closeSrcFs();
428    }
429
430    private void closeSrcFs() throws IOException {
431      if (srcFs != null) {
432        srcFs.close();
433        srcFs = null;
434      }
435    }
436
437    @Override
438    public void failedBulkLoad(final byte[] family, final String stagedPath) throws IOException {
439      try {
440        String src = origSources.get(stagedPath);
441        if (StringUtils.isEmpty(src)) {
442          LOG.debug(stagedPath + " was not moved to staging. No need to move back");
443          return;
444        }
445
446        Path stageP = new Path(stagedPath);
447        if (!fs.exists(stageP)) {
448          throw new IOException(
449            "Missing HFile: " + stageP + ", can't be moved back to it's original place");
450        }
451
452        // we should not move back files if the original exists
453        Path srcPath = new Path(src);
454        if (srcFs.exists(srcPath)) {
455          LOG.debug(src + " is already at it's original place. No need to move.");
456          return;
457        }
458
459        LOG.debug("Moving " + stageP + " back to " + srcPath);
460        if (!fs.rename(stageP, srcPath)) {
461          throw new IOException("Failed to move HFile: " + stageP + " to " + srcPath);
462        }
463
464        // restore original permission
465        if (origPermissions.containsKey(stagedPath)) {
466          fs.setPermission(srcPath, origPermissions.get(src));
467        } else {
468          LOG.warn("Can't find previous permission for path=" + stagedPath);
469        }
470      } finally {
471        closeSrcFs();
472      }
473    }
474
475    /**
476     * Check if the path is referencing a file. This is mainly needed to avoid symlinks.
477     * @return true if the p is a file
478     */
479    private boolean isFile(Path p) throws IOException {
480      FileStatus status = srcFs.getFileStatus(p);
481      boolean isFile = !status.isDirectory();
482      try {
483        isFile =
484          isFile && !(Boolean) Methods.call(FileStatus.class, status, "isSymlink", null, null);
485      } catch (Exception e) {
486      }
487      return isFile;
488    }
489  }
490}