001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.example;
020
021import java.io.Closeable;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.coprocessor.Batch;
032import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
033import org.apache.hadoop.hbase.ipc.ServerRpcController;
034import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
035import org.apache.hadoop.util.Tool;
036import org.apache.hadoop.util.ToolRunner;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * This client class is for invoking the refresh HFile function deployed on the
043 * Region Server side via the RefreshHFilesService.
044 */
045@InterfaceAudience.Private
046public class RefreshHFilesClient extends Configured implements Tool, Closeable {
047  private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesClient.class);
048  private final Connection connection;
049
050  /**
051   * Constructor with Conf object
052   *
053   * @param cfg the {@link Configuration} object to use
054   */
055  public RefreshHFilesClient(Configuration cfg) {
056    try {
057      this.connection = ConnectionFactory.createConnection(cfg);
058    } catch (IOException e) {
059      throw new RuntimeException(e);
060    }
061  }
062
063  @Override
064  public void close() throws IOException {
065    if (this.connection != null && !this.connection.isClosed()) {
066      this.connection.close();
067    }
068  }
069
070  public void refreshHFiles(final TableName tableName) throws Throwable {
071    try (Table table = connection.getTable(tableName)) {
072      refreshHFiles(table);
073    }
074  }
075
076  public void refreshHFiles(final Table table) throws Throwable {
077    final RefreshHFilesProtos.RefreshHFilesRequest request =
078            RefreshHFilesProtos.RefreshHFilesRequest.getDefaultInstance();
079    table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class,
080            HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
081            new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
082                    RefreshHFilesProtos.RefreshHFilesResponse>() {
083        @Override
084        public RefreshHFilesProtos.RefreshHFilesResponse call(
085              RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
086              throws IOException {
087          ServerRpcController controller = new ServerRpcController();
088          BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
089                new BlockingRpcCallback<>();
090          refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
091
092          if (controller.failedOnException()) {
093            throw controller.getFailedOn();
094          }
095
096          return rpcCallback.get();
097        }
098      });
099    LOG.debug("Done refreshing HFiles");
100  }
101
102  @Override
103  public int run(String[] args) throws Exception {
104    if (args.length != 1) {
105      String message = "When there are multiple HBase clusters are sharing a common root dir, "
106          + "especially for read replica cluster (see detail in HBASE-18477), please consider to "
107          + "use this tool manually sync the flushed HFiles from the source cluster.";
108      message += "\nUsage: " + this.getClass().getName() + " tableName";
109      System.out.println(message);
110      return -1;
111    }
112    final TableName tableName = TableName.valueOf(args[0]);
113    try {
114      refreshHFiles(tableName);
115    } catch (Throwable t) {
116      LOG.error("Refresh HFiles from table " + tableName.getNameAsString() + "  failed: ", t);
117      return -1;
118    }
119    return 0;
120  }
121
122  public static void main(String[] args) throws Exception {
123    ToolRunner.run(new RefreshHFilesClient(HBaseConfiguration.create()), args);
124  }
125}