001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 030import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 039import org.apache.hadoop.security.token.Token; 040 041import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 042 043/** 044 * Client proxy for SecureBulkLoadProtocol 045 */ 046@InterfaceAudience.Private 047public class SecureBulkLoadClient { 048 private Table table; 049 private final RpcControllerFactory rpcControllerFactory; 050 051 public SecureBulkLoadClient(final Configuration conf, Table table) { 052 this.table = table; 053 this.rpcControllerFactory = new RpcControllerFactory(conf); 054 } 055 056 public String prepareBulkLoad(final Connection conn) throws IOException { 057 try { 058 ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn, 059 table.getName(), HConstants.EMPTY_START_ROW, 060 this.rpcControllerFactory.newController(), PRIORITY_UNSET) { 061 @Override 062 protected String rpcCall() throws Exception { 063 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 064 RegionSpecifier region = 065 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); 066 PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder() 067 .setTableName(ProtobufUtil.toProtoTableName(table.getName())) 068 .setRegion(region).build(); 069 PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); 070 return response.getBulkToken(); 071 } 072 }; 073 return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) 074 .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); 075 } catch (Throwable throwable) { 076 throw new IOException(throwable); 077 } 078 } 079 080 public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { 081 try { 082 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, 083 table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) { 084 @Override 085 protected Void rpcCall() throws Exception { 086 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 087 RegionSpecifier region = RequestConverter.buildRegionSpecifier( 088 RegionSpecifierType.REGION_NAME, regionName); 089 CleanupBulkLoadRequest request = 090 CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build(); 091 getStub().cleanupBulkLoad(null, request); 092 return null; 093 } 094 }; 095 RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) 096 .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE); 097 } catch (Throwable throwable) { 098 throw new IOException(throwable); 099 } 100 } 101 102 /** 103 * Securely bulk load a list of HFiles using client protocol. 104 * 105 * @param client 106 * @param familyPaths 107 * @param regionName 108 * @param assignSeqNum 109 * @param userToken 110 * @param bulkToken 111 * @return true if all are loaded 112 * @throws IOException 113 */ 114 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 115 final List<Pair<byte[], String>> familyPaths, 116 final byte[] regionName, boolean assignSeqNum, 117 final Token<?> userToken, final String bulkToken) throws IOException { 118 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, 119 bulkToken, false, null, true); 120 } 121 122 /** 123 * Securely bulk load a list of HFiles using client protocol. 124 * 125 * @param client 126 * @param familyPaths 127 * @param regionName 128 * @param assignSeqNum 129 * @param userToken 130 * @param bulkToken 131 * @param copyFiles 132 * @return true if all are loaded 133 * @throws IOException 134 */ 135 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 136 final List<Pair<byte[], String>> familyPaths, 137 final byte[] regionName, boolean assignSeqNum, 138 final Token<?> userToken, final String bulkToken, 139 boolean copyFiles) throws IOException { 140 return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, 141 bulkToken, false, null, true); 142 } 143 144 public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, 145 final List<Pair<byte[], String>> familyPaths, 146 final byte[] regionName, boolean assignSeqNum, 147 final Token<?> userToken, final String bulkToken, 148 boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException { 149 BulkLoadHFileRequest request = 150 RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, 151 userToken, bulkToken, copyFiles, clusterIds, replicate); 152 153 try { 154 BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); 155 return response.getLoaded(); 156 } catch (Exception se) { 157 throw ProtobufUtil.handleRemoteException(se); 158 } 159 } 160}