001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.Arrays;
026import java.util.Comparator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.TreeMap;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import org.apache.commons.lang3.NotImplementedException;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileAlreadyExistsException;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.PathFilter;
047import org.apache.hadoop.hbase.Coprocessor;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.TableDescriptors;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
053import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.regionserver.BloomType;
059import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
066
067/**
068 * Implementation of {@link TableDescriptors} that reads descriptors from the passed filesystem. It
069 * expects descriptors to be in a file in the {@link #TABLEINFO_DIR} subdir of the table's directory
070 * in FS. Can be read-only -- i.e. does not modify the filesystem or can be read and write.
071 * <p>
072 * Also has utility for keeping up the table descriptors tableinfo file. The table schema file is
073 * kept in the {@link #TABLEINFO_DIR} subdir of the table directory in the filesystem. It has a
074 * {@link #TABLEINFO_FILE_PREFIX} and then a suffix that is the edit sequenceid: e.g.
075 * <code>.tableinfo.0000000003</code>. This sequenceid is always increasing. It starts at zero. The
076 * table schema file with the highest sequenceid has the most recent schema edit. Usually there is
077 * one file only, the most recent but there may be short periods where there are more than one file.
078 * Old files are eventually cleaned. Presumption is that there will not be lots of concurrent
079 * clients making table schema edits. If so, the below needs a bit of a reworking and perhaps some
080 * supporting api in hdfs.
081 */
082@InterfaceAudience.Private
083public class FSTableDescriptors implements TableDescriptors {
084  private static final Logger LOG = LoggerFactory.getLogger(FSTableDescriptors.class);
085  private final FileSystem fs;
086  private final Path rootdir;
087  private final boolean fsreadonly;
088  private final boolean usecache;
089  private volatile boolean fsvisited;
090  private boolean tableDescriptorParallelLoadEnable = false;
091  private ThreadPoolExecutor executor;
092
093  long cachehits = 0;
094  long invocations = 0;
095
096  /**
097   * The file name prefix used to store HTD in HDFS
098   */
099  static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
100  public static final String TABLEINFO_DIR = ".tabledesc";
101
102  // This cache does not age out the old stuff. Thinking is that the amount
103  // of data we keep up in here is so small, no need to do occasional purge.
104  // TODO.
105  private final Map<TableName, TableDescriptor> cache = new ConcurrentHashMap<>();
106
107  /**
108   * Construct a FSTableDescriptors instance using the hbase root dir of the given conf and the
109   * filesystem where that root dir lives. This instance can do write operations (is not read only).
110   */
111  public FSTableDescriptors(final Configuration conf) throws IOException {
112    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf));
113  }
114
115  public FSTableDescriptors(final FileSystem fs, final Path rootdir) {
116    this(fs, rootdir, false, true);
117  }
118
119  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
120    final boolean usecache) {
121    this(fs, rootdir, fsreadonly, usecache, 0);
122  }
123
124  public FSTableDescriptors(final FileSystem fs, final Path rootdir, final boolean fsreadonly,
125    final boolean usecache, final int tableDescriptorParallelLoadThreads) {
126    this.fs = fs;
127    this.rootdir = rootdir;
128    this.fsreadonly = fsreadonly;
129    this.usecache = usecache;
130    if (tableDescriptorParallelLoadThreads > 0) {
131      tableDescriptorParallelLoadEnable = true;
132      executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
133        tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
134        new ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
135          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
136      executor.allowCoreThreadTimeOut(true);
137    }
138  }
139
140  public static void tryUpdateMetaTableDescriptor(Configuration conf) throws IOException {
141    tryUpdateAndGetMetaTableDescriptor(conf, CommonFSUtils.getCurrentFileSystem(conf),
142      CommonFSUtils.getRootDir(conf));
143  }
144
145  public static TableDescriptor tryUpdateAndGetMetaTableDescriptor(Configuration conf,
146    FileSystem fs, Path rootdir) throws IOException {
147    // see if we already have meta descriptor on fs. Write one if not.
148    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs,
149      CommonFSUtils.getTableDir(rootdir, TableName.META_TABLE_NAME), false);
150    if (opt.isPresent()) {
151      return opt.get().getSecond();
152    }
153    TableDescriptorBuilder builder = createMetaTableDescriptorBuilder(conf);
154    TableDescriptor td = StoreFileTrackerFactory.updateWithTrackerConfigs(conf, builder.build());
155    LOG.info("Creating new hbase:meta table descriptor {}", td);
156    TableName tableName = td.getTableName();
157    Path tableDir = CommonFSUtils.getTableDir(rootdir, tableName);
158    Path p = writeTableDescriptor(fs, td, tableDir, null);
159    if (p == null) {
160      throw new IOException("Failed update hbase:meta table descriptor");
161    }
162    LOG.info("Updated hbase:meta table descriptor to {}", p);
163    return td;
164  }
165
166  public static ColumnFamilyDescriptor getTableFamilyDescForMeta(final Configuration conf) {
167    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
168      .setMaxVersions(
169        conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
170      .setInMemory(true).setBlocksize(8 * 1024).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
171      .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
172      .setBloomFilterType(BloomType.ROWCOL).build();
173  }
174
175  public static ColumnFamilyDescriptor getReplBarrierFamilyDescForMeta() {
176    return ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
177      .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
178      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
179      .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
180      .setBloomFilterType(BloomType.ROWCOL).build();
181  }
182
183  private static TableDescriptorBuilder createMetaTableDescriptorBuilder(final Configuration conf)
184    throws IOException {
185    // TODO We used to set CacheDataInL1 for META table. When we have BucketCache in file mode, now
186    // the META table data goes to File mode BC only. Test how that affect the system. If too much,
187    // we have to rethink about adding back the setCacheDataInL1 for META table CFs.
188    return TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME)
189      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
190        .setMaxVersions(
191          conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
192        .setInMemory(true)
193        .setBlocksize(
194          conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
195        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROWCOL)
196        .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
197        .build())
198      .setColumnFamily(getTableFamilyDescForMeta(conf))
199      .setColumnFamily(getReplBarrierFamilyDescForMeta())
200      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.NAMESPACE_FAMILY)
201        .setMaxVersions(
202          conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS))
203        .setInMemory(true)
204        .setBlocksize(
205          conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
206        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
207        .setDataBlockEncoding(org.apache.hadoop.hbase.io.encoding.DataBlockEncoding.ROW_INDEX_V1)
208        .setBloomFilterType(BloomType.ROWCOL).build())
209      .setCoprocessor(
210        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
211          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
212  }
213
214  protected boolean isUsecache() {
215    return this.usecache;
216  }
217
218  /**
219   * Get the current table descriptor for the given table, or null if none exists.
220   * <p/>
221   * Uses a local cache of the descriptor but still checks the filesystem on each call if
222   * {@link #fsvisited} is not {@code true}, i.e, we haven't done a full scan yet, to see if a newer
223   * file has been created since the cached one was read.
224   */
225  @Override
226  @Nullable
227  public TableDescriptor get(TableName tableName) {
228    invocations++;
229    if (usecache) {
230      // Look in cache of descriptors.
231      TableDescriptor cachedtdm = this.cache.get(tableName);
232      if (cachedtdm != null) {
233        cachehits++;
234        return cachedtdm;
235      }
236      // we do not need to go to fs any more
237      if (fsvisited) {
238        return null;
239      }
240    }
241    TableDescriptor tdmt = null;
242    try {
243      tdmt = getTableDescriptorFromFs(fs, getTableDir(tableName), fsreadonly).map(Pair::getSecond)
244        .orElse(null);
245    } catch (IOException ioe) {
246      LOG.debug("Exception during readTableDecriptor. Current table name = " + tableName, ioe);
247    }
248    // last HTD written wins
249    if (usecache && tdmt != null) {
250      this.cache.put(tableName, tdmt);
251    }
252
253    return tdmt;
254  }
255
256  /**
257   * Returns a map from table name to table descriptor for all tables.
258   */
259  @Override
260  public Map<String, TableDescriptor> getAll() throws IOException {
261    Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
262    if (fsvisited) {
263      for (Map.Entry<TableName, TableDescriptor> entry : this.cache.entrySet()) {
264        tds.put(entry.getKey().getNameWithNamespaceInclAsString(), entry.getValue());
265      }
266    } else {
267      LOG.info("Fetching table descriptors from the filesystem.");
268      final long startTime = EnvironmentEdgeManager.currentTime();
269      AtomicBoolean allvisited = new AtomicBoolean(usecache);
270      List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
271      if (!tableDescriptorParallelLoadEnable) {
272        for (Path dir : tableDirs) {
273          internalGet(dir, tds, allvisited);
274        }
275      } else {
276        CountDownLatch latch = new CountDownLatch(tableDirs.size());
277        for (Path dir : tableDirs) {
278          executor.submit(new Runnable() {
279            @Override
280            public void run() {
281              try {
282                internalGet(dir, tds, allvisited);
283              } finally {
284                latch.countDown();
285              }
286            }
287          });
288        }
289        try {
290          latch.await();
291        } catch (InterruptedException ie) {
292          throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
293        }
294      }
295      fsvisited = allvisited.get();
296      LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
297        + (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
298    }
299    return tds;
300  }
301
302  private void internalGet(Path dir, Map<String, TableDescriptor> tds, AtomicBoolean allvisited) {
303    TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
304    if (htd == null) {
305      allvisited.set(false);
306    } else {
307      tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
308    }
309  }
310
311  /**
312   * Find descriptors by namespace.
313   * @see #get(org.apache.hadoop.hbase.TableName)
314   */
315  @Override
316  public Map<String, TableDescriptor> getByNamespace(String name) throws IOException {
317    Map<String, TableDescriptor> htds = new TreeMap<>();
318    List<Path> tableDirs =
319      FSUtils.getLocalTableDirs(fs, CommonFSUtils.getNamespaceDir(rootdir, name));
320    for (Path d : tableDirs) {
321      TableDescriptor htd = get(CommonFSUtils.getTableName(d));
322      if (htd == null) {
323        continue;
324      }
325      htds.put(CommonFSUtils.getTableName(d).getNameAsString(), htd);
326    }
327    return htds;
328  }
329
330  @Override
331  public void update(TableDescriptor td, boolean cacheOnly) throws IOException {
332    // TODO: in fact this method will only be called at master side, so fsreadonly and usecache will
333    // always be true. In general, we'd better have a ReadOnlyFSTableDesciptors for HRegionServer
334    // but now, HMaster extends HRegionServer, so unless making use of generic, we can not have
335    // different implementations for HMaster and HRegionServer. Revisit this when we make HMaster
336    // not extend HRegionServer in the future.
337    if (fsreadonly) {
338      throw new UnsupportedOperationException("Cannot add a table descriptor - in read only mode");
339    }
340    if (!cacheOnly) {
341      updateTableDescriptor(td);
342    }
343    if (usecache) {
344      this.cache.put(td.getTableName(), td);
345    }
346  }
347
348  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
349      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
350  Path updateTableDescriptor(TableDescriptor td) throws IOException {
351    TableName tableName = td.getTableName();
352    Path tableDir = getTableDir(tableName);
353    Path p = writeTableDescriptor(fs, td, tableDir,
354      getTableDescriptorFromFs(fs, tableDir, fsreadonly).map(Pair::getFirst).orElse(null));
355    if (p == null) {
356      throw new IOException("Failed update");
357    }
358    LOG.info("Updated tableinfo=" + p);
359    return p;
360  }
361
362  /**
363   * Removes the table descriptor from the local cache and returns it. If not in read only mode, it
364   * also deletes the entire table directory(!) from the FileSystem.
365   */
366  @Override
367  public TableDescriptor remove(final TableName tablename) throws IOException {
368    if (fsreadonly) {
369      throw new NotImplementedException("Cannot remove a table descriptor - in read only mode");
370    }
371    Path tabledir = getTableDir(tablename);
372    if (this.fs.exists(tabledir)) {
373      if (!this.fs.delete(tabledir, true)) {
374        throw new IOException("Failed delete of " + tabledir.toString());
375      }
376    }
377    TableDescriptor descriptor = this.cache.remove(tablename);
378    return descriptor;
379  }
380
381  /**
382   * Check whether we have a valid TableDescriptor.
383   */
384  public static boolean isTableDir(FileSystem fs, Path tableDir) throws IOException {
385    return getTableDescriptorFromFs(fs, tableDir, true).isPresent();
386  }
387
388  /**
389   * Compare {@link FileStatus} instances by {@link Path#getName()}. Returns in reverse order.
390   */
391  static final Comparator<FileStatus> TABLEINFO_FILESTATUS_COMPARATOR =
392    new Comparator<FileStatus>() {
393      @Override
394      public int compare(FileStatus left, FileStatus right) {
395        return right.getPath().getName().compareTo(left.getPath().getName());
396      }
397    };
398
399  /**
400   * Return the table directory in HDFS
401   */
402  private Path getTableDir(TableName tableName) {
403    return CommonFSUtils.getTableDir(rootdir, tableName);
404  }
405
406  private static final PathFilter TABLEINFO_PATHFILTER = new PathFilter() {
407    @Override
408    public boolean accept(Path p) {
409      // Accept any file that starts with TABLEINFO_NAME
410      return p.getName().startsWith(TABLEINFO_FILE_PREFIX);
411    }
412  };
413
414  /**
415   * Width of the sequenceid that is a suffix on a tableinfo file.
416   */
417  static final int WIDTH_OF_SEQUENCE_ID = 10;
418
419  /**
420   * @param number Number to use as suffix.
421   * @return Returns zero-prefixed decimal version of passed number (Does absolute in case number is
422   *         negative).
423   */
424  private static String formatTableInfoSequenceId(final int number) {
425    byte[] b = new byte[WIDTH_OF_SEQUENCE_ID];
426    int d = Math.abs(number);
427    for (int i = b.length - 1; i >= 0; i--) {
428      b[i] = (byte) ((d % 10) + '0');
429      d /= 10;
430    }
431    return Bytes.toString(b);
432  }
433
434  @Override
435  public void close() throws IOException {
436    // Close the executor when parallel loading enabled.
437    if (tableDescriptorParallelLoadEnable) {
438      this.executor.shutdown();
439    }
440  }
441
442  static final class SequenceIdAndFileLength {
443
444    final int sequenceId;
445
446    final int fileLength;
447
448    SequenceIdAndFileLength(int sequenceId, int fileLength) {
449      this.sequenceId = sequenceId;
450      this.fileLength = fileLength;
451    }
452  }
453
454  /**
455   * Returns the current sequence id and file length or 0 if none found.
456   * @param p Path to a <code>.tableinfo</code> file.
457   */
458  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
459      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
460  static SequenceIdAndFileLength getTableInfoSequenceIdAndFileLength(Path p) {
461    String name = p.getName();
462    if (!name.startsWith(TABLEINFO_FILE_PREFIX)) {
463      throw new IllegalArgumentException("Invalid table descriptor file name: " + name);
464    }
465    int firstDot = name.indexOf('.', TABLEINFO_FILE_PREFIX.length());
466    if (firstDot < 0) {
467      // oldest style where we do not have both sequence id and file length
468      return new SequenceIdAndFileLength(0, 0);
469    }
470    int secondDot = name.indexOf('.', firstDot + 1);
471    if (secondDot < 0) {
472      // old stype where we do not have file length
473      int sequenceId = Integer.parseInt(name.substring(firstDot + 1));
474      return new SequenceIdAndFileLength(sequenceId, 0);
475    }
476    int sequenceId = Integer.parseInt(name.substring(firstDot + 1, secondDot));
477    int fileLength = Integer.parseInt(name.substring(secondDot + 1));
478    return new SequenceIdAndFileLength(sequenceId, fileLength);
479  }
480
481  /**
482   * Returns Name of tableinfo file.
483   */
484  @RestrictedApi(explanation = "Should only be called in tests or self", link = "",
485      allowedOnPath = ".*/src/test/.*|.*/FSTableDescriptors\\.java")
486  static String getTableInfoFileName(int sequenceId, byte[] content) {
487    return TABLEINFO_FILE_PREFIX + "." + formatTableInfoSequenceId(sequenceId) + "."
488      + content.length;
489  }
490
491  /**
492   * Returns the latest table descriptor for the given table directly from the file system if it
493   * exists, bypassing the local cache. Returns null if it's not found.
494   */
495  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path hbaseRootDir,
496    TableName tableName) throws IOException {
497    Path tableDir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
498    return getTableDescriptorFromFs(fs, tableDir);
499  }
500
501  /**
502   * Returns the latest table descriptor for the table located at the given directory directly from
503   * the file system if it exists.
504   */
505  public static TableDescriptor getTableDescriptorFromFs(FileSystem fs, Path tableDir)
506    throws IOException {
507    return getTableDescriptorFromFs(fs, tableDir, true).map(Pair::getSecond).orElse(null);
508  }
509
510  private static void deleteMalformedFile(FileSystem fs, Path file) throws IOException {
511    LOG.info("Delete malformed table descriptor file {}", file);
512    if (!fs.delete(file, false)) {
513      LOG.warn("Failed to delete malformed table descriptor file {}", file);
514    }
515  }
516
517  private static Optional<Pair<FileStatus, TableDescriptor>> getTableDescriptorFromFs(FileSystem fs,
518    Path tableDir, boolean readonly) throws IOException {
519    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
520    FileStatus[] descFiles = CommonFSUtils.listStatus(fs, tableInfoDir, TABLEINFO_PATHFILTER);
521    if (descFiles == null || descFiles.length < 1) {
522      return Optional.empty();
523    }
524    Arrays.sort(descFiles, TABLEINFO_FILESTATUS_COMPARATOR);
525    int i = 0;
526    TableDescriptor td = null;
527    FileStatus descFile = null;
528    for (; i < descFiles.length; i++) {
529      descFile = descFiles[i];
530      Path file = descFile.getPath();
531      // get file length from file name if present
532      int fileLength = getTableInfoSequenceIdAndFileLength(file).fileLength;
533      byte[] content = new byte[fileLength > 0 ? fileLength : Ints.checkedCast(descFile.getLen())];
534      try (FSDataInputStream in = fs.open(file)) {
535        in.readFully(content);
536      } catch (EOFException e) {
537        LOG.info("Failed to load file {} due to EOF, it should be half written: {}", file,
538          e.toString());
539        if (!readonly) {
540          deleteMalformedFile(fs, file);
541        }
542        continue;
543      }
544      try {
545        td = TableDescriptorBuilder.parseFrom(content);
546        break;
547      } catch (DeserializationException e) {
548        LOG.info("Failed to parse file {} due to malformed protobuf message: {}", file,
549          e.toString());
550        if (!readonly) {
551          deleteMalformedFile(fs, file);
552        }
553      }
554    }
555    if (!readonly) {
556      // i + 1 to skip the one we load
557      for (i = i + 1; i < descFiles.length; i++) {
558        Path file = descFiles[i].getPath();
559        LOG.info("Delete old table descriptor file {}", file);
560        if (!fs.delete(file, false)) {
561          LOG.info("Failed to delete old table descriptor file {}", file);
562        }
563      }
564    }
565    return td != null ? Optional.of(Pair.newPair(descFile, td)) : Optional.empty();
566  }
567
568  @RestrictedApi(explanation = "Should only be called in tests", link = "",
569      allowedOnPath = ".*/src/test/.*")
570  public static void deleteTableDescriptors(FileSystem fs, Path tableDir) throws IOException {
571    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
572    deleteTableDescriptorFiles(fs, tableInfoDir, Integer.MAX_VALUE);
573  }
574
575  /**
576   * Deletes files matching the table info file pattern within the given directory whose sequenceId
577   * is at most the given max sequenceId.
578   */
579  private static void deleteTableDescriptorFiles(FileSystem fs, Path dir, int maxSequenceId)
580    throws IOException {
581    FileStatus[] status = CommonFSUtils.listStatus(fs, dir, TABLEINFO_PATHFILTER);
582    for (FileStatus file : status) {
583      Path path = file.getPath();
584      int sequenceId = getTableInfoSequenceIdAndFileLength(path).sequenceId;
585      if (sequenceId <= maxSequenceId) {
586        boolean success = CommonFSUtils.delete(fs, path, false);
587        if (success) {
588          LOG.debug("Deleted {}", path);
589        } else {
590          LOG.error("Failed to delete table descriptor at {}", path);
591        }
592      }
593    }
594  }
595
596  /**
597   * Attempts to write a new table descriptor to the given table's directory. It begins at the
598   * currentSequenceId + 1 and tries 10 times to find a new sequence number not already in use.
599   * <p/>
600   * Removes the current descriptor file if passed in.
601   * @return Descriptor file or null if we failed write.
602   */
603  private static Path writeTableDescriptor(final FileSystem fs, final TableDescriptor td,
604    final Path tableDir, final FileStatus currentDescriptorFile) throws IOException {
605    // Here we will write to the final directory directly to avoid renaming as on OSS renaming is
606    // not atomic and has performance issue. The reason why we could do this is that, in the below
607    // code we will not overwrite existing files, we will write a new file instead. And when
608    // loading, we will skip the half written file, please see the code in getTableDescriptorFromFs
609    Path tableInfoDir = new Path(tableDir, TABLEINFO_DIR);
610
611    // In proc v2 we have table lock so typically, there will be no concurrent writes. Keep the
612    // retry logic here since we may still want to write the table descriptor from for example,
613    // HBCK2?
614    int currentSequenceId = currentDescriptorFile == null
615      ? 0
616      : getTableInfoSequenceIdAndFileLength(currentDescriptorFile.getPath()).sequenceId;
617
618    // Put arbitrary upperbound on how often we retry
619    int maxAttempts = 10;
620    int maxSequenceId = currentSequenceId + maxAttempts;
621    byte[] bytes = TableDescriptorBuilder.toByteArray(td);
622    for (int newSequenceId = currentSequenceId + 1; newSequenceId
623        <= maxSequenceId; newSequenceId++) {
624      String fileName = getTableInfoFileName(newSequenceId, bytes);
625      Path filePath = new Path(tableInfoDir, fileName);
626      try (FSDataOutputStream out = fs.create(filePath, false)) {
627        out.write(bytes);
628      } catch (FileAlreadyExistsException e) {
629        LOG.debug("{} exists; retrying up to {} times", filePath, maxAttempts, e);
630        continue;
631      } catch (IOException e) {
632        LOG.debug("Failed write {}; retrying up to {} times", filePath, maxAttempts, e);
633        continue;
634      }
635      deleteTableDescriptorFiles(fs, tableInfoDir, newSequenceId - 1);
636      return filePath;
637    }
638    return null;
639  }
640
641  /**
642   * Create new TableDescriptor in HDFS. Happens when we are creating table. Used by tests.
643   * @return True if we successfully created file.
644   */
645  public boolean createTableDescriptor(TableDescriptor htd) throws IOException {
646    return createTableDescriptor(htd, false);
647  }
648
649  /**
650   * Create new TableDescriptor in HDFS. Happens when we are creating table. If forceCreation is
651   * true then even if previous table descriptor is present it will be overwritten
652   * @return True if we successfully created file.
653   */
654  public boolean createTableDescriptor(TableDescriptor htd, boolean forceCreation)
655    throws IOException {
656    Path tableDir = getTableDir(htd.getTableName());
657    return createTableDescriptorForTableDirectory(tableDir, htd, forceCreation);
658  }
659
660  /**
661   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
662   * new table during cluster start or in Clone and Create Table Procedures. Checks readOnly flag
663   * passed on construction.
664   * @param tableDir      table directory under which we should write the file
665   * @param htd           description of the table to write
666   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
667   *                      be overwritten
668   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
669   *         already exists and we weren't forcing the descriptor creation.
670   * @throws IOException if a filesystem error occurs
671   */
672  public boolean createTableDescriptorForTableDirectory(Path tableDir, TableDescriptor htd,
673    boolean forceCreation) throws IOException {
674    if (this.fsreadonly) {
675      throw new NotImplementedException("Cannot create a table descriptor - in read only mode");
676    }
677    return createTableDescriptorForTableDirectory(this.fs, tableDir, htd, forceCreation);
678  }
679
680  /**
681   * Create a new TableDescriptor in HDFS in the specified table directory. Happens when we create a
682   * new table snapshoting. Does not enforce read-only. That is for caller to determine.
683   * @param fs            Filesystem to use.
684   * @param tableDir      table directory under which we should write the file
685   * @param htd           description of the table to write
686   * @param forceCreation if <tt>true</tt>,then even if previous table descriptor is present it will
687   *                      be overwritten
688   * @return <tt>true</tt> if the we successfully created the file, <tt>false</tt> if the file
689   *         already exists and we weren't forcing the descriptor creation.
690   * @throws IOException if a filesystem error occurs
691   */
692  public static boolean createTableDescriptorForTableDirectory(FileSystem fs, Path tableDir,
693    TableDescriptor htd, boolean forceCreation) throws IOException {
694    Optional<Pair<FileStatus, TableDescriptor>> opt = getTableDescriptorFromFs(fs, tableDir, false);
695    if (opt.isPresent()) {
696      LOG.debug("Current path={}", opt.get().getFirst());
697      if (!forceCreation) {
698        if (htd.equals(opt.get().getSecond())) {
699          LOG.trace("TableInfo already exists.. Skipping creation");
700          return false;
701        }
702      }
703    }
704    return writeTableDescriptor(fs, htd, tableDir, opt.map(Pair::getFirst).orElse(null)) != null;
705  }
706}