001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.Arrays;
026import java.util.Comparator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.TreeMap;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import org.apache.commons.lang3.NotImplementedException;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileAlreadyExistsException;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.PathFilter;
047import org.apache.hadoop.hbase.Coprocessor;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.TableDescriptors;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
053import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.regionserver.BloomType;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
067
068/**
069 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
070 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory
071 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write.
072 * <p>
073 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is
074 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a
075 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g.
076 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The
077 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is
078 * one file only, the most recent but there may be short periods where there are more than one file.
079 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent
080 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some
081 * supporting api in hdfs.
082 */
083@InterfaceAudience.Private
084public class FSTableDescriptors implements TableDescriptors {
085  private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class);
086  private final FileSystem fs;
087  private final Path rootdir;
088  private final boolean fsreadonly;
089  private final boolean usecache;
090  private volatile boolean fsvisited;
091  private boolean tableDescriptorParallelLoadEnable = false;
092  private ThreadPoolExecutor executor;
093
094  long cachehits = 0;
095  long invocations = 0;
096
097  /**
098   * The file name prefix used to store HTD in HDFS
099   */
100  static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
101  public static final String TABLEINFO_DIR = ".tabledesc";
102
103  // This cache does not age out the old stuff. Thinking is that the amount
104  // of data we keep up in here is so small, no need to do occasional purge.
105  // TODO.
106  private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>();
107
108  /**
109   * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the
110   * filesystem where that root dir lives. This instance can do write operations (is not read only).
111   */
112  public FSTableDescriptors(final Configuration conf) throws IOException {
113    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf));
114  }
115
116  public FSTableDescriptors(final FileSystem fs, final Path rootdir) {
117    this(fs, rootdir, false, true);
118  }
119
120  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
121    final boolean usecache) {
122    this(fs, rootdir, fsreadonly, usecache, 0);
123  }
124
125  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
126    final boolean usecache, final int tableDescriptorParallelLoadThreads) {
127    this.fs = fs;
128    this.rootdir = rootdir;
129    this.fsreadonly = fsreadonly;
130    this.usecache = usecache;
131    if (tableDescriptorParallelLoadThreads > 0) {
132      tableDescriptorParallelLoadEnable = true;
133      executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
134        tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
135        new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
136          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
137      executor.allowCoreThreadTimeOut(true);
138    }
139  }
140
141  public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
142    tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf),
143      CommonFSUtils.getRootDir(conf));
144  }
145
146  public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf,
147    FileSystem fs, Path rootdir) throws IOException {
148    // see if we already have meta descriptor on fs. Write one if not.
149    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs,
150      CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false);
151    if (opt.isPresent()) {
152      return opt.get().getSecond();
153    }
154    TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf);
155    TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build());
156    LOG.info("Creating new {} table descriptor {}", TableName.META_TABLE_NAME, td);
157    TableName tableName = td.getTableName();
158    Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName);
159    Path p = writeTableDescriptor(fs, td, tableDir, null);
160    if (p == null) {
161      throw new IOException("Failed update " + TableName.META_TABLE_NAME + " table descriptor");
162    }
163    LOG.info("Updated {} table descriptor to {}", TableName.META_TABLE_NAME, p);
164    return td;
165  }
166
167  public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) {
168    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
169      .setMaxVersions(
170        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
171      .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
172      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
173      .build();
174  }
175
176  public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() {
177    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
178      .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
179      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
180      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
181      .build();
182  }
183
184  public static ColumnFamilyDescriptor getNamespaceFamilyDescForMeta(Configuration conf) {
185    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.NAMESPACE_FAMILY)
186      .setMaxVersions(
187        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
188      .setInMemory(true)
189      .setBlocksize(
190        conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
191      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
192      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
193      .build();
194  }
195
196  private static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf)
197    throws IOException {
198    // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now
199    // the META table data goes to File mode BC only. Test how that affect the system. If too much,
200    // we have to rethink about adding back the setCacheDataInL1 for META table CFs.
201    return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
202      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
203        .setMaxVersions(
204          conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
205        .setInMemory(true)
206        .setBlocksize(
207          conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
208        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROWCOL)
209        .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
210      .setColumnFamily(getTableFamilyDescForMeta(conf))
211      .setColumnFamily(getReplBarrierFamilyDescForMeta())
212      .setColumnFamily(getNamespaceFamilyDescForMeta(conf)).setCoprocessor(
213        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
214          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
215  }
216
217  protected boolean isUsecache() {
218    return this.usecache;
219  }
220
221  /**
222   * Get the current table descriptor for the given table, or null if none exists.
223   * <p/>
224   * Uses a local cache of the descriptor but still checks the filesystem on each call if
225   * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer
226   * file has been created since the cached one was read.
227   */
228  @Override
229  @Nullable
230  public TableDescriptor get(TableName tableName) {
231    invocations++;
232    if (usecache) {
233      // Look in cache of descriptors.
234      TableDescriptor cachedtdm = this.cache.get(tableName);
235      if (cachedtdm != null) {
236        cachehits++;
237        return cachedtdm;
238      }
239      // we do not need to go to the fs anymore
240      if (fsvisited) {
241        return null;
242      }
243    }
244    TableDescriptor tdmt = null;
245    try {
246      tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond)
247        .orElse(null);
248    } catch (IOException ioe) {
249      LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe);
250    }
251    // last HTD written wins
252    if (usecache && tdmt != null) {
253      this.cache.put(tableName, tdmt);
254    }
255
256    return tdmt;
257  }
258
259  /**
260   * Returns a map from table name to table descriptor for all tables.
261   */
262  @Override
263  public Map<String, TableDescriptor> getAll() throws IOException {
264    Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
265    if (fsvisited) {
266      for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
267        tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
268      }
269    } else {
270      LOG.info("Fetching table descriptors from the filesystem.");
271      final long startTime = EnvironmentEdgeManager.currentTime();
272      AtomicBoolean allvisited = new AtomicBoolean(usecache);
273      List<Path> tableDirs =
274        FSUtils.getTableDirs(fs, rootdir).stream().filter(FSUtils::isLocalMetaTable).toList();
275      if (!tableDescriptorParallelLoadEnable) {
276        for (Path dir : tableDirs) {
277          internalGet(dir, tds, allvisited);
278        }
279      } else {
280        CountDownLatch latch = new CountDownLatch(tableDirs.size());
281        for (Path dir : tableDirs) {
282          executor.submit(new Runnable() {
283            @Override
284            public void run() {
285              try {
286                internalGet(dir, tds, allvisited);
287              } finally {
288                latch.countDown();
289              }
290            }
291          });
292        }
293        try {
294          latch.await();
295        } catch (InterruptedException ie) {
296          throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
297        }
298      }
299      fsvisited = allvisited.get();
300      LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
301        + (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
302    }
303    return tds;
304  }
305
306  private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) {
307    TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
308    if (htd == null) {
309      allvisited.set(false);
310    } else {
311      tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
312    }
313  }
314
315  /**
316   * Find descriptors by namespace.
317   * @see #get(org.apache.hadoop.hbase.TableName)
318   */
319  @Override
320  public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
321    Map<String, TableDescriptor> htds = new TreeMap<>();
322    List<Path> tableDirs =
323      FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name));
324    for (Path d : tableDirs) {
325      TableDescriptor htd = get(CommonFSUtils.getTableName(d));
326      if (htd == null) {
327        continue;
328      }
329      htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd);
330    }
331    return htds;
332  }
333
334  @Override
335  public void update(TableDescriptor td, boolean cacheOnly) throws IOException {
336    // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will
337    // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer
338    // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have
339    // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster
340    // not extend HRegionServer in the future.
341    if (fsreadonly) {
342      throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode");
343    }
344    if (!cacheOnly) {
345      updateTableDescriptor(td);
346    }
347    if (usecache) {
348      this.cache.put(td.getTableName(), td);
349    }
350  }
351
352  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
353      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
354  Path updateTableDescriptor(TableDescriptor td) throws IOException {
355    TableName tableName = td.getTableName();
356    Path tableDir = getTableDir(tableName);
357    Path p = writeTableDescriptor(fs, td, tableDir,
358      getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null));
359    if (p == null) {
360      throw new IOException("Failed update");
361    }
362    LOG.info("Updated tableinfo=" + p);
363    return p;
364  }
365
366  /**
367   * Removes the table descriptor from the local cache and returns it. If not in read only mode, it
368   * also deletes the entire table directory(!) from the FileSystem.
369   */
370  @Override
371  public TableDescriptor remove(final TableName tablename) throws IOException {
372    if (fsreadonly) {
373      throw new NotImplementedException("Cannot remove a table descriptor - in read only mode");
374    }
375    Path tabledir = getTableDir(tablename);
376    if (this.fs.exists(tabledir)) {
377      if (!this.fs.delete(tabledir, true)) {
378        throw new IOException("Failed delete of " + tabledir.toString());
379      }
380    }
381    TableDescriptor descriptor = this.cache.remove(tablename);
382    return descriptor;
383  }
384
385  /**
386   * Check whether we have a valid TableDescriptor.
387   */
388  public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException {
389    return getTableDescriptorFromFs(fs, tableDir, true).isPresent();
390  }
391
392  /**
393   * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order.
394   */
395  static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR =
396    new Comparator<FileStatus>() {
397      @Override
398      public int compare(FileStatus left, FileStatus right) {
399        return right.getPath().getName().compareTo(left.getPath().getName());
400      }
401    };
402
403  /**
404   * Return the table directory in HDFS
405   */
406  private Path getTableDir(TableName tableName) {
407    return CommonFSUtils.getTableDir(rootdir, tableName);
408  }
409
410  private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() {
411    @Override
412    public boolean accept(Path p) {
413      // Accept any file that starts with TABLEINFO_NAME
414      return p.getName().startsWith(TABLEINFO_FILE_PREFIX);
415    }
416  };
417
418  /**
419   * Width of the sequenceid that is a suffix on a tableinfo file.
420   */
421  static final int WIDTH_OF_SEQUENCE_ID = 10;
422
423  /**
424   * @param number Number to use as suffix.
425   * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is
426   *         negative).
427   */
428  private static String formatTableInfoSequenceId(final int number) {
429    byte[] b = new byte[WIDTH_OF_SEQUENCE_ID];
430    int d = Math.abs(number);
431    for (int i = b.length - 1; i >= 0; i--) {
432      b[i] = (byte) ((d % 10) + '0');
433      d /= 10;
434    }
435    return Bytes.toString(b);
436  }
437
438  @Override
439  public void close() throws IOException {
440    // Close the executor when parallel loading enabled.
441    if (tableDescriptorParallelLoadEnable) {
442      this.executor.shutdown();
443    }
444  }
445
446  static final class SequenceIdAndFileLength {
447
448    final int sequenceId;
449
450    final int fileLength;
451
452    SequenceIdAndFileLength(int sequenceId, int fileLength) {
453      this.sequenceId = sequenceId;
454      this.fileLength = fileLength;
455    }
456  }
457
458  /**
459   * Returns the current sequence id and file length or 0 if none found.
460   * @param p Path to a <code>.tableinfo</code> file.
461   */
462  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
463      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
464  static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) {
465    String name = p.getName();
466    if (!name.startsWith(TABLEINFO_FILE_PREFIX)) {
467      throw new IllegalArgumentException("Invalid table descriptor file name: " + name);
468    }
469    int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length());
470    if (firstDot < 0) {
471      // oldest style where we do not have both sequence id and file length
472      return new SequenceIdAndFileLength(0, 0);
473    }
474    int secondDot = name.indexOf('.', firstDot + 1);
475    if (secondDot < 0) {
476      // old stype where we do not have file length
477      int sequenceId = Integer.parseInt(name.substring(firstDot + 1));
478      return new SequenceIdAndFileLength(sequenceId, 0);
479    }
480    int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot));
481    int fileLength = Integer.parseInt(name.substring(secondDot + 1));
482    return new SequenceIdAndFileLength(sequenceId, fileLength);
483  }
484
485  /**
486   * Returns Name of tableinfo file.
487   */
488  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
489      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
490  static String getTableInfoFileName(int sequenceId, byte[] content) {
491    return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "."
492      + content.length;
493  }
494
495  /**
496   * Returns the latest table descriptor for the given table directly from the file system if it
497   * exists, bypassing the local cache. Returns null if it's not found.
498   */
499  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir,
500    TableName tableName) throws IOException {
501    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
502    return getTableDescriptorFromFs(fs, tableDir);
503  }
504
505  /**
506   * Returns the latest table descriptor for the table located at the given directory directly from
507   * the file system if it exists.
508   */
509  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir)
510    throws IOException {
511    return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null);
512  }
513
514  private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException {
515    LOG.info("Delete malformed table descriptor file {}", file);
516    if (!fs.delete(file, false)) {
517      LOG.warn("Failed to delete malformed table descriptor file {}", file);
518    }
519  }
520
521  private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs,
522    Path tableDir, boolean readonly) throws IOException {
523    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
524    FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER);
525    if (descFiles == null || descFiles.length < 1) {
526      return Optional.empty();
527    }
528    Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR);
529    int i = 0;
530    TableDescriptor td = null;
531    FileStatus descFile = null;
532    for (; i < descFiles.length; i++) {
533      descFile = descFiles[i];
534      Path file = descFile.getPath();
535      // get file length from file name if present
536      int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength;
537      byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())];
538      try (FSDataInputStream in = fs.open(file)) {
539        in.readFully(content);
540      } catch (EOFException e) {
541        LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file,
542          e.toString());
543        if (!readonly) {
544          deleteMalformedFile(fs, file);
545        }
546        continue;
547      }
548      try {
549        td = TableDescriptorBuilder.parseFrom(content);
550        break;
551      } catch (DeserializationException e) {
552        LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file,
553          e.toString());
554        if (!readonly) {
555          deleteMalformedFile(fs, file);
556        }
557      }
558    }
559    if (!readonly) {
560      // i + 1 to skip the one we load
561      for (i = i + 1; i < descFiles.length; i++) {
562        Path file = descFiles[i].getPath();
563        LOG.info("Delete old table descriptor file {}", file);
564        if (!fs.delete(file, false)) {
565          LOG.info("Failed to delete old table descriptor file {}", file);
566        }
567      }
568    }
569    return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
570  }
571
572  @RestrictedApi(explanation = "Should only be called in tests", link = "",
573      allowedOnPath = ".*/src/test/.*")
574  public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
575    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
576    deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
577  }
578
579  /**
580   * Deletes files matching the table info file pattern within the given directory whose sequenceId
581   * is at most the given max sequenceId.
582   */
583  private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId)
584    throws IOException {
585    FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER);
586    for (FileStatus file : status) {
587      Path path = file.getPath();
588      int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId;
589      if (sequenceId <= maxSequenceId) {
590        boolean success = CommonFSUtils.delete(fs, path, false);
591        if (success) {
592          LOG.debug("Deleted {}", path);
593        } else {
594          LOG.error("Failed to delete table descriptor at {}", path);
595        }
596      }
597    }
598  }
599
600  /**
601   * Attempts to write a new table descriptor to the given table's directory. It begins at the
602   * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use.
603   * <p/>
604   * Removes the current descriptor file if passed in.
605   * @return Descriptor file or null if we failed write.
606   */
607  private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td,
608    final Path tableDir, final FileStatus currentDescriptorFile) throws IOException {
609    // Here we will write to the final directory directly to avoid renaming as on OSS renaming is
610    // not atomic and has performance issue. The reason why we could do this is that, in the below
611    // code we will not overwrite existing files, we will write a new file instead. And when
612    // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs
613    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
614
615    // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the
616    // retry logic here since we may still want to write the table descriptor from for example,
617    // HBCK2?
618    int currentSequenceId = currentDescriptorFile == null
619      ? 0
620      : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId;
621
622    // Put arbitrary upperbound on how often we retry
623    int maxAttempts = 10;
624    int maxSequenceId = currentSequenceId + maxAttempts;
625    byte[] bytes = TableDescriptorBuilder.toByteArray(td);
626    for (int newSequenceId = currentSequenceId + 1; newSequenceId
627        <= maxSequenceId; newSequenceId++) {
628      String fileName = getTableInfoFileName(newSequenceId, bytes);
629      Path filePath = new Path(tableInfoDir, fileName);
630      try (FSDataOutputStream out = fs.create(filePath, false)) {
631        out.write(bytes);
632      } catch (FileAlreadyExistsException e) {
633        LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e);
634        continue;
635      } catch (IOException e) {
636        LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e);
637        continue;
638      }
639      deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1);
640      return filePath;
641    }
642    return null;
643  }
644
645  /**
646   * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests.
647   * @return True if we successfully created file.
648   */
649  public boolean createTableDescriptor(TableDescriptor htd) throws IOException {
650    return createTableDescriptor(htd, false);
651  }
652
653  /**
654   * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is
655   * true then even if previous table descriptor is present it will be overwritten
656   * @return True if we successfully created file.
657   */
658  public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation)
659    throws IOException {
660    Path tableDir = getTableDir(htd.getTableName());
661    return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation);
662  }
663
664  /**
665   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
666   * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag
667   * passed on construction.
668   * @param tableDir      table directory under which we should write the file
669   * @param htd           description of the table to write
670   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
671   *                      be overwritten
672   * @return <tt>true</tt> if we successfully created the file, <tt>false</tt> if the file already
673   *         exists, and we weren't forcing the descriptor creation.
674   * @throws IOException if a filesystem error occurs
675   */
676  public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd,
677    boolean forceCreation) throws IOException {
678    if (this.fsreadonly) {
679      throw new NotImplementedException("Cannot create a table descriptor - in read only mode");
680    }
681    return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation);
682  }
683
684  /**
685   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
686   * new table snapshoting. Does not enforce read-only. That is for caller to determine.
687   * @param fs            Filesystem to use.
688   * @param tableDir      table directory under which we should write the file
689   * @param htd           description of the table to write
690   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
691   *                      be overwritten
692   * @return <tt>true</tt> if we successfully created the file, <tt>false</tt> if the file already
693   *         exists, and we weren't forcing the descriptor creation.
694   * @throws IOException if a filesystem error occurs
695   */
696  public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir,
697    TableDescriptor htd, boolean forceCreation) throws IOException {
698    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false);
699    if (opt.isPresent()) {
700      LOG.debug("Current path={}", opt.get().getFirst());
701      if (!forceCreation) {
702        if (htd.equals(opt.get().getSecond())) {
703          LOG.trace("TableInfo already exists.. Skipping creation");
704          return false;
705        }
706      }
707    }
708    return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null;
709  }
710
711  /**
712   * Invalidates the table descriptor cache.
713   */
714  @Override
715  public void invalidateTableDescriptorCache() {
716    LOG.info("Invalidating table descriptor cache.");
717    this.fsvisited = false;
718    this.cache.clear();
719  }
720}