001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.fs;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.Objects;
023import java.util.stream.Collectors;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataOutputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HBaseIOException;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.util.CommonFSUtils;
034import org.apache.hadoop.hdfs.DistributedFileSystem;
035import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@InterfaceAudience.Private
041public final class ErasureCodingUtils {
042
043  private ErasureCodingUtils() {
044  }
045
046  private static final Logger LOG = LoggerFactory.getLogger(ErasureCodingUtils.class);
047
048  /**
049   * Runs checks against the FileSystem, verifying that HDFS is supported and the policy is
050   * available, enabled, and works with a simple write.
051   */
052  public static void verifySupport(Configuration conf, String policy) throws HBaseIOException {
053    DistributedFileSystem dfs = getDfs(conf);
054    checkAvailable(dfs, policy);
055
056    // Enable the policy on a test directory. Try writing ot it to ensure that HDFS allows it
057    // This acts as a safeguard against topology issues (not enough nodes for policy, etc) and
058    // anything else. This is otherwise hard to validate more directly.
059    Path globalTempDir = new Path(conf.get(HConstants.HBASE_DIR), HConstants.HBASE_TEMP_DIRECTORY);
060    Path currentTempDir = createTempDir(dfs, globalTempDir);
061    try {
062      setPolicy(dfs, currentTempDir, policy);
063      try (FSDataOutputStream out = dfs.create(new Path(currentTempDir, "test.out"))) {
064        out.writeUTF("Testing " + policy);
065      }
066    } catch (IOException e) {
067      throw new DoNotRetryIOException("Failed write test for EC policy. Check cause or logs", e);
068    } finally {
069      try {
070        dfs.delete(currentTempDir, true);
071      } catch (IOException e) {
072        LOG.warn("Failed to delete temp path for ec test", e);
073      }
074    }
075  }
076
077  private static Path createTempDir(FileSystem fs, Path tempDir) throws HBaseIOException {
078    Path currentTempDir = new Path(tempDir, "ec-test-" + System.currentTimeMillis());
079    try {
080      fs.mkdirs(currentTempDir);
081      fs.deleteOnExit(currentTempDir);
082    } catch (IOException e) {
083      throw new HBaseIOException("Failed to create test dir for EC write test", e);
084    }
085    return currentTempDir;
086  }
087
088  private static void checkAvailable(DistributedFileSystem dfs, String policy)
089    throws HBaseIOException {
090    Collection<ErasureCodingPolicyInfo> policies;
091    try {
092      policies = dfs.getAllErasureCodingPolicies();
093    } catch (IOException e) {
094      throw new HBaseIOException("Failed to check for Erasure Coding policy: " + policy, e);
095    }
096    for (ErasureCodingPolicyInfo policyInfo : policies) {
097      if (policyInfo.getPolicy().getName().equals(policy)) {
098        if (!policyInfo.isEnabled()) {
099          throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + policy
100            + ". The policy must be enabled, but has state " + policyInfo.getState());
101        }
102        return;
103      }
104    }
105    throw new DoNotRetryIOException(
106      "Cannot set Erasure Coding policy: " + policy + ". Policy not found. Available policies are: "
107        + policies.stream().map(p -> p.getPolicy().getName()).collect(Collectors.joining(", ")));
108  }
109
110  /**
111   * Check if EC policy is different between two descriptors
112   * @return true if a sync is necessary
113   */
114  public static boolean needsSync(TableDescriptor oldDescriptor, TableDescriptor newDescriptor) {
115    String newPolicy = oldDescriptor.getErasureCodingPolicy();
116    String oldPolicy = newDescriptor.getErasureCodingPolicy();
117    return !Objects.equals(oldPolicy, newPolicy);
118  }
119
120  /**
121   * Sync the EC policy state from the newDescriptor onto the FS for the table dir of the provided
122   * table descriptor. If the policy is null, we will remove erasure coding from the FS for the
123   * table dir. If it's non-null, we'll set it to that policy.
124   * @param newDescriptor descriptor containing the policy and table name
125   */
126  public static void sync(FileSystem fs, Path rootDir, TableDescriptor newDescriptor)
127    throws IOException {
128    String newPolicy = newDescriptor.getErasureCodingPolicy();
129    if (newPolicy == null) {
130      unsetPolicy(fs, rootDir, newDescriptor.getTableName());
131    } else {
132      setPolicy(fs, rootDir, newDescriptor.getTableName(), newPolicy);
133    }
134  }
135
136  /**
137   * Sets the EC policy on the table directory for the specified table
138   */
139  public static void setPolicy(FileSystem fs, Path rootDir, TableName tableName, String policy)
140    throws IOException {
141    Path path = CommonFSUtils.getTableDir(rootDir, tableName);
142    setPolicy(fs, path, policy);
143  }
144
145  /**
146   * Sets the EC policy on the path
147   */
148  public static void setPolicy(FileSystem fs, Path path, String policy) throws IOException {
149    getDfs(fs).setErasureCodingPolicy(path, policy);
150  }
151
152  /**
153   * Unsets any EC policy specified on the path.
154   */
155  public static void unsetPolicy(FileSystem fs, Path rootDir, TableName tableName)
156    throws IOException {
157    DistributedFileSystem dfs = getDfs(fs);
158    Path path = CommonFSUtils.getTableDir(rootDir, tableName);
159    if (dfs.getErasureCodingPolicy(path) == null) {
160      LOG.warn("No EC policy set for path {}, nothing to unset", path);
161      return;
162    }
163    dfs.unsetErasureCodingPolicy(path);
164  }
165
166  private static DistributedFileSystem getDfs(Configuration conf) throws HBaseIOException {
167    try {
168      return getDfs(FileSystem.get(conf));
169    } catch (DoNotRetryIOException e) {
170      throw e;
171    } catch (IOException e) {
172      throw new HBaseIOException("Failed to get FileSystem from conf", e);
173    }
174
175  }
176
177  private static DistributedFileSystem getDfs(FileSystem fs) throws DoNotRetryIOException {
178    if (!(fs instanceof DistributedFileSystem)) {
179      throw new DoNotRetryIOException(
180        "Cannot manage Erasure Coding policy. Erasure Coding is only available on HDFS, but fs is "
181          + fs.getClass().getSimpleName());
182    }
183    return (DistributedFileSystem) fs;
184  }
185}