001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.EOFException;
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Comparator;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.TreeMap;
030import java.util.concurrent.ConcurrentHashMap;
031import org.apache.commons.lang3.NotImplementedException;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.PathFilter;
040import org.apache.hadoop.hbase.Coprocessor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableDescriptors;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
050import org.apache.hadoop.hbase.exceptions.DeserializationException;
051import org.apache.hadoop.hbase.regionserver.BloomType;
052import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
058
059/**
060 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
061 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory
062 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write.
063 * <p>
064 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is
065 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a
066 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g.
067 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The
068 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is
069 * one file only, the most recent but there may be short periods where there are more than one file.
070 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent
071 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some
072 * supporting api in hdfs.
073 */
074@InterfaceAudience.Private
075public class FSTableDescriptors implements TableDescriptors {
076  private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class);
077  private final FileSystem fs;
078  private final Path rootdir;
079  private final boolean fsreadonly;
080  private final boolean usecache;
081  private volatile boolean fsvisited;
082
083  long cachehits = 0;
084  long invocations = 0;
085
086  /**
087   * The file name prefix used to store HTD in HDFS
088   */
089  static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
090
091  public static final String TABLEINFO_DIR = ".tabledesc";
092
093  // This cache does not age out the old stuff. Thinking is that the amount
094  // of data we keep up in here is so small, no need to do occasional purge.
095  // TODO.
096  private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>();
097
098  /**
099   * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the
100   * filesystem where that root dir lives. This instance can do write operations (is not read only).
101   */
102  public FSTableDescriptors(final Configuration conf) throws IOException {
103    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf));
104  }
105
106  public FSTableDescriptors(final FileSystem fs, final Path rootdir) {
107    this(fs, rootdir, false, true);
108  }
109
110  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
111    final boolean usecache) {
112    this.fs = fs;
113    this.rootdir = rootdir;
114    this.fsreadonly = fsreadonly;
115    this.usecache = usecache;
116  }
117
118  public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
119    tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf),
120      CommonFSUtils.getRootDir(conf));
121  }
122
123  public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf,
124    FileSystem fs, Path rootdir) throws IOException {
125    // see if we already have meta descriptor on fs. Write one if not.
126    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs,
127      CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false);
128    if (opt.isPresent()) {
129      return opt.get().getSecond();
130    }
131    TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf);
132    TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build());
133    LOG.info("Creating new hbase:meta table descriptor {}", td);
134    TableName tableName = td.getTableName();
135    Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName);
136    Path p = writeTableDescriptor(fs, td, tableDir, null);
137    if (p == null) {
138      throw new IOException("Failed update hbase:meta table descriptor");
139    }
140    LOG.info("Updated hbase:meta table descriptor to {}", p);
141    return td;
142  }
143
144  public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) {
145    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
146      .setMaxVersions(
147        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
148      .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
149      .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
150      .setBloomFilterType(BloomType.ROWCOL).build();
151  }
152
153  public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() {
154    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
155      .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
156      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
157      .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
158      .setBloomFilterType(BloomType.ROWCOL).build();
159  }
160
161  public static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf)
162    throws IOException {
163    // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now
164    // the META table data goes to File mode BC only. Test how that affect the system. If too much,
165    // we have to rethink about adding back the setCacheDataInL1 for META table CFs.
166    return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
167      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
168        .setMaxVersions(
169          conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
170        .setInMemory(true)
171        .setBlocksize(
172          conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
173        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
174        .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
175        .setBloomFilterType(BloomType.ROWCOL).build())
176      .setColumnFamily(getTableFamilyDescForMeta(conf))
177      .setColumnFamily(getReplBarrierFamilyDescForMeta()).setCoprocessor(
178        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
179          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
180  }
181
182  protected boolean isUsecache() {
183    return this.usecache;
184  }
185
186  /**
187   * Get the current table descriptor for the given table, or null if none exists.
188   * <p/>
189   * Uses a local cache of the descriptor but still checks the filesystem on each call if
190   * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer
191   * file has been created since the cached one was read.
192   */
193  @Override
194  @Nullable
195  public TableDescriptor get(TableName tableName) {
196    invocations++;
197    if (usecache) {
198      // Look in cache of descriptors.
199      TableDescriptor cachedtdm = this.cache.get(tableName);
200      if (cachedtdm != null) {
201        cachehits++;
202        return cachedtdm;
203      }
204      // we do not need to go to fs any more
205      if (fsvisited) {
206        return null;
207      }
208    }
209    TableDescriptor tdmt = null;
210    try {
211      tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond)
212        .orElse(null);
213    } catch (IOException ioe) {
214      LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe);
215    }
216    // last HTD written wins
217    if (usecache && tdmt != null) {
218      this.cache.put(tableName, tdmt);
219    }
220
221    return tdmt;
222  }
223
224  /**
225   * Returns a map from table name to table descriptor for all tables.
226   */
227  @Override
228  public Map<String, TableDescriptor> getAll() throws IOException {
229    Map<String, TableDescriptor> tds = new TreeMap<>();
230    if (fsvisited) {
231      for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
232        tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
233      }
234    } else {
235      LOG.trace("Fetching table descriptors from the filesystem.");
236      boolean allvisited = usecache;
237      for (Path d : FSUtils.getTableDirs(fs, rootdir)) {
238        TableDescriptor htd = get(CommonFSUtils.getTableName(d));
239        if (htd == null) {
240          allvisited = false;
241        } else {
242          tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
243        }
244      }
245      fsvisited = allvisited;
246    }
247    return tds;
248  }
249
250  /**
251   * Find descriptors by namespace.
252   * @see #get(org.apache.hadoop.hbase.TableName)
253   */
254  @Override
255  public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
256    Map<String, TableDescriptor> htds = new TreeMap<>();
257    List<Path> tableDirs =
258      FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name));
259    for (Path d : tableDirs) {
260      TableDescriptor htd = get(CommonFSUtils.getTableName(d));
261      if (htd == null) {
262        continue;
263      }
264      htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd);
265    }
266    return htds;
267  }
268
269  @Override
270  public void update(TableDescriptor td, boolean cacheOnly) throws IOException {
271    // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will
272    // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer
273    // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have
274    // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster
275    // not extend HRegionServer in the future.
276    if (fsreadonly) {
277      throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode");
278    }
279    if (!cacheOnly) {
280      updateTableDescriptor(td);
281    }
282    if (usecache) {
283      this.cache.put(td.getTableName(), td);
284    }
285  }
286
287  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
288      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
289  Path updateTableDescriptor(TableDescriptor td) throws IOException {
290    TableName tableName = td.getTableName();
291    Path tableDir = getTableDir(tableName);
292    Path p = writeTableDescriptor(fs, td, tableDir,
293      getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null));
294    if (p == null) {
295      throw new IOException("Failed update");
296    }
297    LOG.info("Updated tableinfo=" + p);
298    return p;
299  }
300
301  /**
302   * Removes the table descriptor from the local cache and returns it. If not in read only mode, it
303   * also deletes the entire table directory(!) from the FileSystem.
304   */
305  @Override
306  public TableDescriptor remove(final TableName tablename) throws IOException {
307    if (fsreadonly) {
308      throw new NotImplementedException("Cannot remove a table descriptor - in read only mode");
309    }
310    Path tabledir = getTableDir(tablename);
311    if (this.fs.exists(tabledir)) {
312      if (!this.fs.delete(tabledir, true)) {
313        throw new IOException("Failed delete of " + tabledir.toString());
314      }
315    }
316    TableDescriptor descriptor = this.cache.remove(tablename);
317    return descriptor;
318  }
319
320  /**
321   * Check whether we have a valid TableDescriptor.
322   */
323  public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException {
324    return getTableDescriptorFromFs(fs, tableDir, true).isPresent();
325  }
326
327  /**
328   * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order.
329   */
330  static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR =
331    new Comparator<FileStatus>() {
332      @Override
333      public int compare(FileStatus left, FileStatus right) {
334        return right.getPath().getName().compareTo(left.getPath().getName());
335      }
336    };
337
338  /**
339   * Return the table directory in HDFS
340   */
341  private Path getTableDir(TableName tableName) {
342    return CommonFSUtils.getTableDir(rootdir, tableName);
343  }
344
345  private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() {
346    @Override
347    public boolean accept(Path p) {
348      // Accept any file that starts with TABLEINFO_NAME
349      return p.getName().startsWith(TABLEINFO_FILE_PREFIX);
350    }
351  };
352
353  /**
354   * Width of the sequenceid that is a suffix on a tableinfo file.
355   */
356  static final int WIDTH_OF_SEQUENCE_ID = 10;
357
358  /**
359   * @param number Number to use as suffix.
360   * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is
361   *         negative).
362   */
363  private static String formatTableInfoSequenceId(final int number) {
364    byte[] b = new byte[WIDTH_OF_SEQUENCE_ID];
365    int d = Math.abs(number);
366    for (int i = b.length - 1; i >= 0; i--) {
367      b[i] = (byte) ((d % 10) + '0');
368      d /= 10;
369    }
370    return Bytes.toString(b);
371  }
372
373  static final class SequenceIdAndFileLength {
374
375    final int sequenceId;
376
377    final int fileLength;
378
379    SequenceIdAndFileLength(int sequenceId, int fileLength) {
380      this.sequenceId = sequenceId;
381      this.fileLength = fileLength;
382    }
383  }
384
385  /**
386   * Returns the current sequence id and file length or 0 if none found.
387   * @param p Path to a <code>.tableinfo</code> file.
388   */
389  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
390      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
391  static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) {
392    String name = p.getName();
393    if (!name.startsWith(TABLEINFO_FILE_PREFIX)) {
394      throw new IllegalArgumentException("Invalid table descriptor file name: " + name);
395    }
396    int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length());
397    if (firstDot < 0) {
398      // oldest style where we do not have both sequence id and file length
399      return new SequenceIdAndFileLength(0, 0);
400    }
401    int secondDot = name.indexOf('.', firstDot + 1);
402    if (secondDot < 0) {
403      // old stype where we do not have file length
404      int sequenceId = Integer.parseInt(name.substring(firstDot + 1));
405      return new SequenceIdAndFileLength(sequenceId, 0);
406    }
407    int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot));
408    int fileLength = Integer.parseInt(name.substring(secondDot + 1));
409    return new SequenceIdAndFileLength(sequenceId, fileLength);
410  }
411
412  /**
413   * Returns Name of tableinfo file.
414   */
415  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
416      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
417  static String getTableInfoFileName(int sequenceId, byte[] content) {
418    return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "."
419      + content.length;
420  }
421
422  /**
423   * Returns the latest table descriptor for the given table directly from the file system if it
424   * exists, bypassing the local cache. Returns null if it's not found.
425   */
426  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir,
427    TableName tableName) throws IOException {
428    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
429    return getTableDescriptorFromFs(fs, tableDir);
430  }
431
432  /**
433   * Returns the latest table descriptor for the table located at the given directory directly from
434   * the file system if it exists.
435   */
436  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir)
437    throws IOException {
438    return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null);
439  }
440
441  private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException {
442    LOG.info("Delete malformed table descriptor file {}", file);
443    if (!fs.delete(file, false)) {
444      LOG.warn("Failed to delete malformed table descriptor file {}", file);
445    }
446  }
447
448  private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs,
449    Path tableDir, boolean readonly) throws IOException {
450    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
451    FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER);
452    if (descFiles == null || descFiles.length < 1) {
453      return Optional.empty();
454    }
455    Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR);
456    int i = 0;
457    TableDescriptor td = null;
458    FileStatus descFile = null;
459    for (; i < descFiles.length; i++) {
460      descFile = descFiles[i];
461      Path file = descFile.getPath();
462      // get file length from file name if present
463      int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength;
464      byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())];
465      try (FSDataInputStream in = fs.open(file)) {
466        in.readFully(content);
467      } catch (EOFException e) {
468        LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file,
469          e.toString());
470        if (!readonly) {
471          deleteMalformedFile(fs, file);
472        }
473        continue;
474      }
475      try {
476        td = TableDescriptorBuilder.parseFrom(content);
477        break;
478      } catch (DeserializationException e) {
479        LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file,
480          e.toString());
481        if (!readonly) {
482          deleteMalformedFile(fs, file);
483        }
484      }
485    }
486    if (!readonly) {
487      // i + 1 to skip the one we load
488      for (i = i + 1; i < descFiles.length; i++) {
489        Path file = descFiles[i].getPath();
490        LOG.info("Delete old table descriptor file {}", file);
491        if (!fs.delete(file, false)) {
492          LOG.info("Failed to delete old table descriptor file {}", file);
493        }
494      }
495    }
496    return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
497  }
498
499  @RestrictedApi(explanation = "Should only be called in tests", link = "",
500      allowedOnPath = ".*/src/test/.*")
501  public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
502    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
503    deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
504  }
505
506  /**
507   * Deletes files matching the table info file pattern within the given directory whose sequenceId
508   * is at most the given max sequenceId.
509   */
510  private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId)
511    throws IOException {
512    FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER);
513    for (FileStatus file : status) {
514      Path path = file.getPath();
515      int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId;
516      if (sequenceId <= maxSequenceId) {
517        boolean success = CommonFSUtils.delete(fs, path, false);
518        if (success) {
519          LOG.debug("Deleted {}", path);
520        } else {
521          LOG.error("Failed to delete table descriptor at {}", path);
522        }
523      }
524    }
525  }
526
527  /**
528   * Attempts to write a new table descriptor to the given table's directory. It begins at the
529   * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use.
530   * <p/>
531   * Removes the current descriptor file if passed in.
532   * @return Descriptor file or null if we failed write.
533   */
534  private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td,
535    final Path tableDir, final FileStatus currentDescriptorFile) throws IOException {
536    // Here we will write to the final directory directly to avoid renaming as on OSS renaming is
537    // not atomic and has performance issue. The reason why we could do this is that, in the below
538    // code we will not overwrite existing files, we will write a new file instead. And when
539    // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs
540    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
541
542    // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the
543    // retry logic here since we may still want to write the table descriptor from for example,
544    // HBCK2?
545    int currentSequenceId = currentDescriptorFile == null
546      ? 0
547      : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId;
548
549    // Put arbitrary upperbound on how often we retry
550    int maxAttempts = 10;
551    int maxSequenceId = currentSequenceId + maxAttempts;
552    byte[] bytes = TableDescriptorBuilder.toByteArray(td);
553    for (int newSequenceId = currentSequenceId + 1; newSequenceId
554        <= maxSequenceId; newSequenceId++) {
555      String fileName = getTableInfoFileName(newSequenceId, bytes);
556      Path filePath = new Path(tableInfoDir, fileName);
557      try (FSDataOutputStream out = fs.create(filePath, false)) {
558        out.write(bytes);
559      } catch (FileAlreadyExistsException e) {
560        LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e);
561        continue;
562      } catch (IOException e) {
563        LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e);
564        continue;
565      }
566      deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1);
567      return filePath;
568    }
569    return null;
570  }
571
572  /**
573   * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests.
574   * @return True if we successfully created file.
575   */
576  public boolean createTableDescriptor(TableDescriptor htd) throws IOException {
577    return createTableDescriptor(htd, false);
578  }
579
580  /**
581   * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is
582   * true then even if previous table descriptor is present it will be overwritten
583   * @return True if we successfully created file.
584   */
585  public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation)
586    throws IOException {
587    Path tableDir = getTableDir(htd.getTableName());
588    return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation);
589  }
590
591  /**
592   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
593   * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag
594   * passed on construction.
595   * @param tableDir      table directory under which we should write the file
596   * @param htd           description of the table to write
597   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
598   *                      be overwritten
599   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
600   *         already exists and we weren't forcing the descriptor creation.
601   * @throws IOException if a filesystem error occurs
602   */
603  public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd,
604    boolean forceCreation) throws IOException {
605    if (this.fsreadonly) {
606      throw new NotImplementedException("Cannot create a table descriptor - in read only mode");
607    }
608    return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation);
609  }
610
611  /**
612   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
613   * new table snapshoting. Does not enforce read-only. That is for caller to determine.
614   * @param fs            Filesystem to use.
615   * @param tableDir      table directory under which we should write the file
616   * @param htd           description of the table to write
617   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
618   *                      be overwritten
619   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
620   *         already exists and we weren't forcing the descriptor creation.
621   * @throws IOException if a filesystem error occurs
622   */
623  public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir,
624    TableDescriptor htd, boolean forceCreation) throws IOException {
625    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false);
626    if (opt.isPresent()) {
627      LOG.debug("Current path={}", opt.get().getFirst());
628      if (!forceCreation) {
629        if (htd.equals(opt.get().getSecond())) {
630          LOG.trace("TableInfo already exists.. Skipping creation");
631          return false;
632        }
633      }
634    }
635    return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null;
636  }
637}