001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.example;
019
020import java.io.Closeable;
021import java.io.IOException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.conf.Configured;
024import org.apache.hadoop.hbase.HBaseConfiguration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.coprocessor.Batch;
031import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
032import org.apache.hadoop.hbase.ipc.ServerRpcController;
033import org.apache.hadoop.util.Tool;
034import org.apache.hadoop.util.ToolRunner;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.RefreshHFilesProtos;
040
041/**
042 * This client class is for invoking the refresh HFile function deployed on the Region Server side
043 * via the RefreshHFilesService.
044 */
045@InterfaceAudience.Private
046public class RefreshHFilesClient extends Configured implements Tool, Closeable {
047  private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesClient.class);
048  private final Connection connection;
049
050  /**
051   * Constructor with Conf object
052   * @param cfg the {@link Configuration} object to use
053   */
054  public RefreshHFilesClient(Configuration cfg) {
055    try {
056      this.connection = ConnectionFactory.createConnection(cfg);
057    } catch (IOException e) {
058      throw new RuntimeException(e);
059    }
060  }
061
062  @Override
063  public void close() throws IOException {
064    if (this.connection != null && !this.connection.isClosed()) {
065      this.connection.close();
066    }
067  }
068
069  public void refreshHFiles(final TableName tableName) throws Throwable {
070    try (Table table = connection.getTable(tableName)) {
071      refreshHFiles(table);
072    }
073  }
074
075  public void refreshHFiles(final Table table) throws Throwable {
076    final RefreshHFilesProtos.RefreshHFilesRequest request =
077      RefreshHFilesProtos.RefreshHFilesRequest.getDefaultInstance();
078    table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class,
079      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, new Batch.Call<
080        RefreshHFilesProtos.RefreshHFilesService, RefreshHFilesProtos.RefreshHFilesResponse>() {
081        @Override
082        public RefreshHFilesProtos.RefreshHFilesResponse
083          call(RefreshHFilesProtos.RefreshHFilesService refreshHFilesService) throws IOException {
084          ServerRpcController controller = new ServerRpcController();
085          BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
086            new BlockingRpcCallback<>();
087          refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
088
089          if (controller.failedOnException()) {
090            throw controller.getFailedOn();
091          }
092
093          return rpcCallback.get();
094        }
095      });
096    LOG.debug("Done refreshing HFiles");
097  }
098
099  @Override
100  public int run(String[] args) throws Exception {
101    if (args.length != 1) {
102      String message = "When there are multiple HBase clusters are sharing a common root dir, "
103        + "especially for read replica cluster (see detail in HBASE-18477), please consider to "
104        + "use this tool manually sync the flushed HFiles from the source cluster.";
105      message += "\nUsage: " + this.getClass().getName() + " tableName";
106      System.out.println(message);
107      return -1;
108    }
109    final TableName tableName = TableName.valueOf(args[0]);
110    try {
111      refreshHFiles(tableName);
112    } catch (Throwable t) {
113      LOG.error("Refresh HFiles from table " + tableName.getNameAsString() + "  failed: ", t);
114      return -1;
115    }
116    return 0;
117  }
118
119  public static void main(String[] args) throws Exception {
120    ToolRunner.run(new RefreshHFilesClient(HBaseConfiguration.create()), args);
121  }
122}