View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client.coprocessor;
20  
21  import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
22  import static org.apache.hadoop.hbase.HConstants.LAST_ROW;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.client.Table;
32  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
33  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
34  import org.apache.hadoop.hbase.ipc.ServerRpcController;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
38  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
39  import org.apache.hadoop.hbase.util.ByteStringer;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.security.token.Token;
42  
43  /**
44   * Client proxy for SecureBulkLoadProtocol
45   * used in conjunction with SecureBulkLoadEndpoint
46   */
47  @InterfaceAudience.Private
48  public class SecureBulkLoadClient {
49    private Table table;
50  
51    public SecureBulkLoadClient(Table table) {
52      this.table = table;
53    }
54  
55    public String prepareBulkLoad(final TableName tableName) throws IOException {
56      try {
57        return
58          table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
59            EMPTY_START_ROW,
60            LAST_ROW,
61            new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
62              @Override
63              public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
64                ServerRpcController controller = new ServerRpcController();
65  
66                BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
67                    new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
68  
69                SecureBulkLoadProtos.PrepareBulkLoadRequest request =
70                    SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
71                    .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
72  
73                instance.prepareBulkLoad(controller,
74                    request,
75                    rpcCallback);
76  
77                SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
78                if (controller.failedOnException()) {
79                  throw controller.getFailedOn();
80                }
81                return response.getBulkToken();
82              }
83            }).entrySet().iterator().next().getValue();
84      } catch (Throwable throwable) {
85        throw new IOException(throwable);
86      }
87    }
88  
89    public void cleanupBulkLoad(final String bulkToken) throws IOException {
90      try {
91          table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
92              EMPTY_START_ROW,
93              LAST_ROW,
94              new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
95  
96                @Override
97                public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
98                  ServerRpcController controller = new ServerRpcController();
99  
100                 BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
101                     new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
102 
103                 SecureBulkLoadProtos.CleanupBulkLoadRequest request =
104                     SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
105                         .setBulkToken(bulkToken).build();
106 
107                 instance.cleanupBulkLoad(controller,
108                     request,
109                     rpcCallback);
110 
111                 if (controller.failedOnException()) {
112                   throw controller.getFailedOn();
113                 }
114                 return null;
115               }
116             });
117     } catch (Throwable throwable) {
118       throw new IOException(throwable);
119     }
120   }
121 
122   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
123                          final Token<?> userToken,
124                          final String bulkToken,
125                          final byte[] startRow) throws IOException {
126     // we never want to send a batch of HFiles to all regions, thus cannot call
127     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
128     try {
129       CoprocessorRpcChannel channel = table.coprocessorService(startRow);
130       SecureBulkLoadProtos.SecureBulkLoadService instance =
131           ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
132 
133       SecureBulkLoadProtos.DelegationToken protoDT =
134           SecureBulkLoadProtos.DelegationToken.newBuilder().build();
135       if(userToken != null) {
136         protoDT =
137             SecureBulkLoadProtos.DelegationToken.newBuilder()
138               .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
139               .setPassword(ByteStringer.wrap(userToken.getPassword()))
140               .setKind(userToken.getKind().toString())
141               .setService(userToken.getService().toString()).build();
142       }
143 
144       List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
145           new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
146       for(Pair<byte[], String> el: familyPaths) {
147         protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
148           .setFamily(ByteStringer.wrap(el.getFirst()))
149           .setPath(el.getSecond()).build());
150       }
151 
152       SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
153           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
154             .setFsToken(protoDT)
155             .addAllFamilyPath(protoFamilyPaths)
156             .setBulkToken(bulkToken).build();
157 
158       ServerRpcController controller = new ServerRpcController();
159       BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
160           new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
161       instance.secureBulkLoadHFiles(controller,
162         request,
163         rpcCallback);
164 
165       SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
166       if (controller.failedOnException()) {
167         throw controller.getFailedOn();
168       }
169       return response.getLoaded();
170     } catch (Throwable throwable) {
171       throw new IOException(throwable);
172     }
173   }
174 
175   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
176     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
177   }
178 }