001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.example;
020
021import java.io.Closeable;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.coprocessor.Batch;
032import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
033import org.apache.hadoop.hbase.ipc.ServerRpcController;
034import org.apache.hadoop.util.Tool;
035import org.apache.hadoop.util.ToolRunner;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RefreshHFilesProtos;
041
042/**
043 * This client class is for invoking the refresh HFile function deployed on the
044 * Region Server side via the RefreshHFilesService.
045 */
046@InterfaceAudience.Private
047public class RefreshHFilesClient extends Configured implements Tool, Closeable {
048  private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesClient.class);
049  private final Connection connection;
050
051  /**
052   * Constructor with Conf object
053   *
054   * @param cfg the {@link Configuration} object to use
055   */
056  public RefreshHFilesClient(Configuration cfg) {
057    try {
058      this.connection = ConnectionFactory.createConnection(cfg);
059    } catch (IOException e) {
060      throw new RuntimeException(e);
061    }
062  }
063
064  @Override
065  public void close() throws IOException {
066    if (this.connection != null && !this.connection.isClosed()) {
067      this.connection.close();
068    }
069  }
070
071  public void refreshHFiles(final TableName tableName) throws Throwable {
072    try (Table table = connection.getTable(tableName)) {
073      refreshHFiles(table);
074    }
075  }
076
077  public void refreshHFiles(final Table table) throws Throwable {
078    final RefreshHFilesProtos.RefreshHFilesRequest request =
079            RefreshHFilesProtos.RefreshHFilesRequest.getDefaultInstance();
080    table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class,
081            HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
082            new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
083                    RefreshHFilesProtos.RefreshHFilesResponse>() {
084        @Override
085        public RefreshHFilesProtos.RefreshHFilesResponse call(
086              RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
087              throws IOException {
088          ServerRpcController controller = new ServerRpcController();
089          BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
090                new BlockingRpcCallback<>();
091          refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
092
093          if (controller.failedOnException()) {
094            throw controller.getFailedOn();
095          }
096
097          return rpcCallback.get();
098        }
099      });
100    LOG.debug("Done refreshing HFiles");
101  }
102
103  @Override
104  public int run(String[] args) throws Exception {
105    if (args.length != 1) {
106      String message = "When there are multiple HBase clusters are sharing a common root dir, "
107          + "especially for read replica cluster (see detail in HBASE-18477), please consider to "
108          + "use this tool manually sync the flushed HFiles from the source cluster.";
109      message += "\nUsage: " + this.getClass().getName() + " tableName";
110      System.out.println(message);
111      return -1;
112    }
113    final TableName tableName = TableName.valueOf(args[0]);
114    try {
115      refreshHFiles(tableName);
116    } catch (Throwable t) {
117      LOG.error("Refresh HFiles from table " + tableName.getNameAsString() + "  failed: ", t);
118      return -1;
119    }
120    return 0;
121  }
122
123  public static void main(String[] args) throws Exception {
124    ToolRunner.run(new RefreshHFilesClient(HBaseConfiguration.create()), args);
125  }
126}