001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.Arrays;
026import java.util.Comparator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.TreeMap;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import org.apache.commons.lang3.NotImplementedException;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileAlreadyExistsException;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.PathFilter;
047import org.apache.hadoop.hbase.Coprocessor;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.TableDescriptors;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
053import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.regionserver.BloomType;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
067
068/**
069 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
070 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory
071 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write.
072 * <p>
073 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is
074 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a
075 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g.
076 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The
077 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is
078 * one file only, the most recent but there may be short periods where there are more than one file.
079 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent
080 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some
081 * supporting api in hdfs.
082 */
083@InterfaceAudience.Private
084public class FSTableDescriptors implements TableDescriptors {
085  private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class);
086  private final FileSystem fs;
087  private final Path rootdir;
088  private final boolean fsreadonly;
089  private final boolean usecache;
090  private volatile boolean fsvisited;
091  private boolean tableDescriptorParallelLoadEnable = false;
092  private ThreadPoolExecutor executor;
093
094  long cachehits = 0;
095  long invocations = 0;
096
097  /**
098   * The file name prefix used to store HTD in HDFS
099   */
100  static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
101  public static final String TABLEINFO_DIR = ".tabledesc";
102
103  // This cache does not age out the old stuff. Thinking is that the amount
104  // of data we keep up in here is so small, no need to do occasional purge.
105  // TODO.
106  private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>();
107
108  /**
109   * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the
110   * filesystem where that root dir lives. This instance can do write operations (is not read only).
111   */
112  public FSTableDescriptors(final Configuration conf) throws IOException {
113    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf));
114  }
115
116  public FSTableDescriptors(final FileSystem fs, final Path rootdir) {
117    this(fs, rootdir, false, true);
118  }
119
120  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
121    final boolean usecache) {
122    this(fs, rootdir, fsreadonly, usecache, 0);
123  }
124
125  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
126    final boolean usecache, final int tableDescriptorParallelLoadThreads) {
127    this.fs = fs;
128    this.rootdir = rootdir;
129    this.fsreadonly = fsreadonly;
130    this.usecache = usecache;
131    if (tableDescriptorParallelLoadThreads > 0) {
132      tableDescriptorParallelLoadEnable = true;
133      executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
134        tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
135        new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
136          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
137      executor.allowCoreThreadTimeOut(true);
138    }
139  }
140
141  public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
142    tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf),
143      CommonFSUtils.getRootDir(conf));
144  }
145
146  public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf,
147    FileSystem fs, Path rootdir) throws IOException {
148    // see if we already have meta descriptor on fs. Write one if not.
149    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs,
150      CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false);
151    if (opt.isPresent()) {
152      return opt.get().getSecond();
153    }
154    TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf);
155    TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build());
156    LOG.info("Creating new hbase:meta table descriptor {}", td);
157    TableName tableName = td.getTableName();
158    Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName);
159    Path p = writeTableDescriptor(fs, td, tableDir, null);
160    if (p == null) {
161      throw new IOException("Failed update hbase:meta table descriptor");
162    }
163    LOG.info("Updated hbase:meta table descriptor to {}", p);
164    return td;
165  }
166
167  public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) {
168    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
169      .setMaxVersions(
170        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
171      .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
172      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
173      .build();
174  }
175
176  public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() {
177    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
178      .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
179      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
180      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
181      .build();
182  }
183
184  public static ColumnFamilyDescriptor getNamespaceFamilyDescForMeta(Configuration conf) {
185    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.NAMESPACE_FAMILY)
186      .setMaxVersions(
187        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
188      .setInMemory(true)
189      .setBlocksize(
190        conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
191      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
192      .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).setBloomFilterType(BloomType.ROWCOL)
193      .build();
194  }
195
196  private static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf)
197    throws IOException {
198    // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now
199    // the META table data goes to File mode BC only. Test how that affect the system. If too much,
200    // we have to rethink about adding back the setCacheDataInL1 for META table CFs.
201    return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
202      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
203        .setMaxVersions(
204          conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
205        .setInMemory(true)
206        .setBlocksize(
207          conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
208        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROWCOL)
209        .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
210      .setColumnFamily(getTableFamilyDescForMeta(conf))
211      .setColumnFamily(getReplBarrierFamilyDescForMeta())
212      .setColumnFamily(getNamespaceFamilyDescForMeta(conf)).setCoprocessor(
213        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
214          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
215  }
216
217  protected boolean isUsecache() {
218    return this.usecache;
219  }
220
221  /**
222   * Get the current table descriptor for the given table, or null if none exists.
223   * <p/>
224   * Uses a local cache of the descriptor but still checks the filesystem on each call if
225   * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer
226   * file has been created since the cached one was read.
227   */
228  @Override
229  @Nullable
230  public TableDescriptor get(TableName tableName) {
231    invocations++;
232    if (usecache) {
233      // Look in cache of descriptors.
234      TableDescriptor cachedtdm = this.cache.get(tableName);
235      if (cachedtdm != null) {
236        cachehits++;
237        return cachedtdm;
238      }
239      // we do not need to go to fs any more
240      if (fsvisited) {
241        return null;
242      }
243    }
244    TableDescriptor tdmt = null;
245    try {
246      tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond)
247        .orElse(null);
248    } catch (IOException ioe) {
249      LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe);
250    }
251    // last HTD written wins
252    if (usecache && tdmt != null) {
253      this.cache.put(tableName, tdmt);
254    }
255
256    return tdmt;
257  }
258
259  /**
260   * Returns a map from table name to table descriptor for all tables.
261   */
262  @Override
263  public Map<String, TableDescriptor> getAll() throws IOException {
264    Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
265    if (fsvisited) {
266      for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
267        tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
268      }
269    } else {
270      LOG.info("Fetching table descriptors from the filesystem.");
271      final long startTime = EnvironmentEdgeManager.currentTime();
272      AtomicBoolean allvisited = new AtomicBoolean(usecache);
273      List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
274      if (!tableDescriptorParallelLoadEnable) {
275        for (Path dir : tableDirs) {
276          internalGet(dir, tds, allvisited);
277        }
278      } else {
279        CountDownLatch latch = new CountDownLatch(tableDirs.size());
280        for (Path dir : tableDirs) {
281          executor.submit(new Runnable() {
282            @Override
283            public void run() {
284              try {
285                internalGet(dir, tds, allvisited);
286              } finally {
287                latch.countDown();
288              }
289            }
290          });
291        }
292        try {
293          latch.await();
294        } catch (InterruptedException ie) {
295          throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
296        }
297      }
298      fsvisited = allvisited.get();
299      LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
300        + (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
301    }
302    return tds;
303  }
304
305  private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) {
306    TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
307    if (htd == null) {
308      allvisited.set(false);
309    } else {
310      tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
311    }
312  }
313
314  /**
315   * Find descriptors by namespace.
316   * @see #get(org.apache.hadoop.hbase.TableName)
317   */
318  @Override
319  public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
320    Map<String, TableDescriptor> htds = new TreeMap<>();
321    List<Path> tableDirs =
322      FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name));
323    for (Path d : tableDirs) {
324      TableDescriptor htd = get(CommonFSUtils.getTableName(d));
325      if (htd == null) {
326        continue;
327      }
328      htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd);
329    }
330    return htds;
331  }
332
333  @Override
334  public void update(TableDescriptor td, boolean cacheOnly) throws IOException {
335    // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will
336    // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer
337    // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have
338    // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster
339    // not extend HRegionServer in the future.
340    if (fsreadonly) {
341      throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode");
342    }
343    if (!cacheOnly) {
344      updateTableDescriptor(td);
345    }
346    if (usecache) {
347      this.cache.put(td.getTableName(), td);
348    }
349  }
350
351  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
352      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
353  Path updateTableDescriptor(TableDescriptor td) throws IOException {
354    TableName tableName = td.getTableName();
355    Path tableDir = getTableDir(tableName);
356    Path p = writeTableDescriptor(fs, td, tableDir,
357      getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null));
358    if (p == null) {
359      throw new IOException("Failed update");
360    }
361    LOG.info("Updated tableinfo=" + p);
362    return p;
363  }
364
365  /**
366   * Removes the table descriptor from the local cache and returns it. If not in read only mode, it
367   * also deletes the entire table directory(!) from the FileSystem.
368   */
369  @Override
370  public TableDescriptor remove(final TableName tablename) throws IOException {
371    if (fsreadonly) {
372      throw new NotImplementedException("Cannot remove a table descriptor - in read only mode");
373    }
374    Path tabledir = getTableDir(tablename);
375    if (this.fs.exists(tabledir)) {
376      if (!this.fs.delete(tabledir, true)) {
377        throw new IOException("Failed delete of " + tabledir.toString());
378      }
379    }
380    TableDescriptor descriptor = this.cache.remove(tablename);
381    return descriptor;
382  }
383
384  /**
385   * Check whether we have a valid TableDescriptor.
386   */
387  public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException {
388    return getTableDescriptorFromFs(fs, tableDir, true).isPresent();
389  }
390
391  /**
392   * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order.
393   */
394  static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR =
395    new Comparator<FileStatus>() {
396      @Override
397      public int compare(FileStatus left, FileStatus right) {
398        return right.getPath().getName().compareTo(left.getPath().getName());
399      }
400    };
401
402  /**
403   * Return the table directory in HDFS
404   */
405  private Path getTableDir(TableName tableName) {
406    return CommonFSUtils.getTableDir(rootdir, tableName);
407  }
408
409  private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() {
410    @Override
411    public boolean accept(Path p) {
412      // Accept any file that starts with TABLEINFO_NAME
413      return p.getName().startsWith(TABLEINFO_FILE_PREFIX);
414    }
415  };
416
417  /**
418   * Width of the sequenceid that is a suffix on a tableinfo file.
419   */
420  static final int WIDTH_OF_SEQUENCE_ID = 10;
421
422  /**
423   * @param number Number to use as suffix.
424   * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is
425   *         negative).
426   */
427  private static String formatTableInfoSequenceId(final int number) {
428    byte[] b = new byte[WIDTH_OF_SEQUENCE_ID];
429    int d = Math.abs(number);
430    for (int i = b.length - 1; i >= 0; i--) {
431      b[i] = (byte) ((d % 10) + '0');
432      d /= 10;
433    }
434    return Bytes.toString(b);
435  }
436
437  @Override
438  public void close() throws IOException {
439    // Close the executor when parallel loading enabled.
440    if (tableDescriptorParallelLoadEnable) {
441      this.executor.shutdown();
442    }
443  }
444
445  static final class SequenceIdAndFileLength {
446
447    final int sequenceId;
448
449    final int fileLength;
450
451    SequenceIdAndFileLength(int sequenceId, int fileLength) {
452      this.sequenceId = sequenceId;
453      this.fileLength = fileLength;
454    }
455  }
456
457  /**
458   * Returns the current sequence id and file length or 0 if none found.
459   * @param p Path to a <code>.tableinfo</code> file.
460   */
461  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
462      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
463  static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) {
464    String name = p.getName();
465    if (!name.startsWith(TABLEINFO_FILE_PREFIX)) {
466      throw new IllegalArgumentException("Invalid table descriptor file name: " + name);
467    }
468    int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length());
469    if (firstDot < 0) {
470      // oldest style where we do not have both sequence id and file length
471      return new SequenceIdAndFileLength(0, 0);
472    }
473    int secondDot = name.indexOf('.', firstDot + 1);
474    if (secondDot < 0) {
475      // old stype where we do not have file length
476      int sequenceId = Integer.parseInt(name.substring(firstDot + 1));
477      return new SequenceIdAndFileLength(sequenceId, 0);
478    }
479    int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot));
480    int fileLength = Integer.parseInt(name.substring(secondDot + 1));
481    return new SequenceIdAndFileLength(sequenceId, fileLength);
482  }
483
484  /**
485   * Returns Name of tableinfo file.
486   */
487  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
488      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
489  static String getTableInfoFileName(int sequenceId, byte[] content) {
490    return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "."
491      + content.length;
492  }
493
494  /**
495   * Returns the latest table descriptor for the given table directly from the file system if it
496   * exists, bypassing the local cache. Returns null if it's not found.
497   */
498  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir,
499    TableName tableName) throws IOException {
500    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
501    return getTableDescriptorFromFs(fs, tableDir);
502  }
503
504  /**
505   * Returns the latest table descriptor for the table located at the given directory directly from
506   * the file system if it exists.
507   */
508  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir)
509    throws IOException {
510    return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null);
511  }
512
513  private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException {
514    LOG.info("Delete malformed table descriptor file {}", file);
515    if (!fs.delete(file, false)) {
516      LOG.warn("Failed to delete malformed table descriptor file {}", file);
517    }
518  }
519
520  private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs,
521    Path tableDir, boolean readonly) throws IOException {
522    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
523    FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER);
524    if (descFiles == null || descFiles.length < 1) {
525      return Optional.empty();
526    }
527    Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR);
528    int i = 0;
529    TableDescriptor td = null;
530    FileStatus descFile = null;
531    for (; i < descFiles.length; i++) {
532      descFile = descFiles[i];
533      Path file = descFile.getPath();
534      // get file length from file name if present
535      int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength;
536      byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())];
537      try (FSDataInputStream in = fs.open(file)) {
538        in.readFully(content);
539      } catch (EOFException e) {
540        LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file,
541          e.toString());
542        if (!readonly) {
543          deleteMalformedFile(fs, file);
544        }
545        continue;
546      }
547      try {
548        td = TableDescriptorBuilder.parseFrom(content);
549        break;
550      } catch (DeserializationException e) {
551        LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file,
552          e.toString());
553        if (!readonly) {
554          deleteMalformedFile(fs, file);
555        }
556      }
557    }
558    if (!readonly) {
559      // i + 1 to skip the one we load
560      for (i = i + 1; i < descFiles.length; i++) {
561        Path file = descFiles[i].getPath();
562        LOG.info("Delete old table descriptor file {}", file);
563        if (!fs.delete(file, false)) {
564          LOG.info("Failed to delete old table descriptor file {}", file);
565        }
566      }
567    }
568    return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
569  }
570
571  @RestrictedApi(explanation = "Should only be called in tests", link = "",
572      allowedOnPath = ".*/src/test/.*")
573  public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
574    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
575    deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
576  }
577
578  /**
579   * Deletes files matching the table info file pattern within the given directory whose sequenceId
580   * is at most the given max sequenceId.
581   */
582  private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId)
583    throws IOException {
584    FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER);
585    for (FileStatus file : status) {
586      Path path = file.getPath();
587      int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId;
588      if (sequenceId <= maxSequenceId) {
589        boolean success = CommonFSUtils.delete(fs, path, false);
590        if (success) {
591          LOG.debug("Deleted {}", path);
592        } else {
593          LOG.error("Failed to delete table descriptor at {}", path);
594        }
595      }
596    }
597  }
598
599  /**
600   * Attempts to write a new table descriptor to the given table's directory. It begins at the
601   * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use.
602   * <p/>
603   * Removes the current descriptor file if passed in.
604   * @return Descriptor file or null if we failed write.
605   */
606  private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td,
607    final Path tableDir, final FileStatus currentDescriptorFile) throws IOException {
608    // Here we will write to the final directory directly to avoid renaming as on OSS renaming is
609    // not atomic and has performance issue. The reason why we could do this is that, in the below
610    // code we will not overwrite existing files, we will write a new file instead. And when
611    // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs
612    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
613
614    // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the
615    // retry logic here since we may still want to write the table descriptor from for example,
616    // HBCK2?
617    int currentSequenceId = currentDescriptorFile == null
618      ? 0
619      : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId;
620
621    // Put arbitrary upperbound on how often we retry
622    int maxAttempts = 10;
623    int maxSequenceId = currentSequenceId + maxAttempts;
624    byte[] bytes = TableDescriptorBuilder.toByteArray(td);
625    for (int newSequenceId = currentSequenceId + 1; newSequenceId
626        <= maxSequenceId; newSequenceId++) {
627      String fileName = getTableInfoFileName(newSequenceId, bytes);
628      Path filePath = new Path(tableInfoDir, fileName);
629      try (FSDataOutputStream out = fs.create(filePath, false)) {
630        out.write(bytes);
631      } catch (FileAlreadyExistsException e) {
632        LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e);
633        continue;
634      } catch (IOException e) {
635        LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e);
636        continue;
637      }
638      deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1);
639      return filePath;
640    }
641    return null;
642  }
643
644  /**
645   * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests.
646   * @return True if we successfully created file.
647   */
648  public boolean createTableDescriptor(TableDescriptor htd) throws IOException {
649    return createTableDescriptor(htd, false);
650  }
651
652  /**
653   * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is
654   * true then even if previous table descriptor is present it will be overwritten
655   * @return True if we successfully created file.
656   */
657  public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation)
658    throws IOException {
659    Path tableDir = getTableDir(htd.getTableName());
660    return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation);
661  }
662
663  /**
664   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
665   * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag
666   * passed on construction.
667   * @param tableDir      table directory under which we should write the file
668   * @param htd           description of the table to write
669   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
670   *                      be overwritten
671   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
672   *         already exists and we weren't forcing the descriptor creation.
673   * @throws IOException if a filesystem error occurs
674   */
675  public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd,
676    boolean forceCreation) throws IOException {
677    if (this.fsreadonly) {
678      throw new NotImplementedException("Cannot create a table descriptor - in read only mode");
679    }
680    return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation);
681  }
682
683  /**
684   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
685   * new table snapshoting. Does not enforce read-only. That is for caller to determine.
686   * @param fs            Filesystem to use.
687   * @param tableDir      table directory under which we should write the file
688   * @param htd           description of the table to write
689   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
690   *                      be overwritten
691   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
692   *         already exists and we weren't forcing the descriptor creation.
693   * @throws IOException if a filesystem error occurs
694   */
695  public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir,
696    TableDescriptor htd, boolean forceCreation) throws IOException {
697    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false);
698    if (opt.isPresent()) {
699      LOG.debug("Current path={}", opt.get().getFirst());
700      if (!forceCreation) {
701        if (htd.equals(opt.get().getSecond())) {
702          LOG.trace("TableInfo already exists.. Skipping creation");
703          return false;
704        }
705      }
706    }
707    return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null;
708  }
709}