001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021 022import java.io.IOException; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.security.token.Token; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 032import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 041 042/** 043 * Client proxy for SecureBulkLoadProtocol 044 */ 045@InterfaceAudience.Private 046public class SecureBulkLoadClient { 047 private Table table; 048 private final RpcControllerFactory rpcControllerFactory; 049 050 public SecureBulkLoadClient(final Configuration conf, Table table) { 051 this.table = table; 052 this.rpcControllerFactory = new RpcControllerFactory(conf); 053 } 054 055 public String prepareBulkLoad(final Connection conn) throws IOException { 056 try { 057 ClientServiceCallable<String> callable = 058 new ClientServiceCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW, 059 this.rpcControllerFactory.newController(), PRIORITY_UNSET) { 060 @Override 061 protected String rpcCall() throws Exception { 062 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 063 RegionSpecifier region = 064 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); 065 PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder() 066 .setTableName(ProtobufUtil.toProtoTableName(table.getName())).setRegion(region) 067 .build(); 068 PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); 069 return response.getBulkToken(); 070 } 071 }; 072 return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) 073 .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); 074 } catch (Throwable throwable) { 075 throw new IOException(throwable); 076 } 077 } 078 079 public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { 080 try { 081 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, table.getName(), 082 HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) { 083 @Override 084 protected Void rpcCall() throws Exception { 085 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 086 RegionSpecifier region = 087 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); 088 CleanupBulkLoadRequest request = 089 CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build(); 090 getStub().cleanupBulkLoad(null, request); 091 return null; 092 } 093 }; 094 RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller() 095 .callWithRetries(callable, Integer.MAX_VALUE); 096 } catch (Throwable throwable) { 097 throw new IOException(throwable); 098 } 099 } 100 101 /** 102 * Securely bulk load a list of HFiles using client protocol. nnnnnn * @return true if all are 103 * loaded n 104 */ 105 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 106 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 107 final Token<?> userToken, final String bulkToken) throws IOException { 108 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, 109 false, null, true); 110 } 111 112 /** 113 * Securely bulk load a list of HFiles using client protocol. nnnnnnn * @return true if all are 114 * loaded n 115 */ 116 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 117 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 118 final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException { 119 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, 120 copyFiles, null, true); 121 } 122 123 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 124 final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, 125 final Token<?> userToken, final String bulkToken, boolean copyFiles, List<String> clusterIds, 126 boolean replicate) throws IOException { 127 BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, 128 regionName, assignSeqNum, userToken, bulkToken, copyFiles, clusterIds, replicate); 129 130 try { 131 BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); 132 return response.getLoaded(); 133 } catch (Exception se) { 134 throw ProtobufUtil.handleRemoteException(se); 135 } 136 } 137}