View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.UUID;
30  
31  import com.google.protobuf.HBaseZeroCopyByteString;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellScanner;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.io.SizedCellScanner;
38  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
40  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
43  import org.apache.hadoop.hbase.regionserver.wal.HLog;
44  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
45  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46  import org.apache.hadoop.hbase.util.Pair;
47  
48  import com.google.protobuf.ServiceException;
49  
50  @InterfaceAudience.Private
51  public class ReplicationProtbufUtil {
52    /**
53     * A helper to replicate a list of HLog entries using admin protocol.
54     *
55     * @param admin
56     * @param entries
57     * @throws java.io.IOException
58     */
59    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
60        final HLog.Entry[] entries) throws IOException {
61      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
62        buildReplicateWALEntryRequest(entries);
63      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
64      try {
65        admin.replicateWALEntry(controller, p.getFirst());
66      } catch (ServiceException se) {
67        throw ProtobufUtil.getRemoteException(se);
68      }
69    }
70  
71    /**
72     * Create a new ReplicateWALEntryRequest from a list of HLog entries
73     *
74     * @param entries the HLog entries to be replicated
75     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
76     * found.
77     */
78    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
79        buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
80      // Accumulate all the KVs seen in here.
81      List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
82      int size = 0;
83      WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
84      AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
85      AdminProtos.ReplicateWALEntryRequest.Builder builder =
86        AdminProtos.ReplicateWALEntryRequest.newBuilder();
87      HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
88      for (HLog.Entry entry: entries) {
89        entryBuilder.clear();
90        // TODO: this duplicates a lot in HLogKey#getBuilder
91        WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
92        HLogKey key = entry.getKey();
93        keyBuilder.setEncodedRegionName(
94          HBaseZeroCopyByteString.wrap(key.getEncodedRegionName()));
95        keyBuilder.setTableName(HBaseZeroCopyByteString.wrap(key.getTablename().getName()));
96        keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
97        keyBuilder.setWriteTime(key.getWriteTime());
98        if (key.getNonce() != HConstants.NO_NONCE) {
99          keyBuilder.setNonce(key.getNonce());
100       }
101       if (key.getNonceGroup() != HConstants.NO_NONCE) {
102         keyBuilder.setNonceGroup(key.getNonceGroup());
103       }
104       for(UUID clusterId : key.getClusterIds()) {
105         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
106         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
107         keyBuilder.addClusterIds(uuidBuilder.build());
108       }
109       WALEdit edit = entry.getEdit();
110       NavigableMap<byte[], Integer> scopes = key.getScopes();
111       if (scopes != null && !scopes.isEmpty()) {
112         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
113           scopeBuilder.setFamily(HBaseZeroCopyByteString.wrap(scope.getKey()));
114           WALProtos.ScopeType scopeType =
115               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
116           scopeBuilder.setScopeType(scopeType);
117           keyBuilder.addScopes(scopeBuilder.build());
118         }
119       }
120       List<KeyValue> kvs = edit.getKeyValues();
121       // Add up the size.  It is used later serializing out the kvs.
122       for (KeyValue kv: kvs) {
123         size += kv.getLength();
124       }
125       // Collect up the kvs
126       allkvs.add(kvs);
127       // Write out how many kvs associated with this entry.
128       entryBuilder.setAssociatedCellCount(kvs.size());
129       builder.addEntry(entryBuilder.build());
130     }
131     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
132       getCellScanner(allkvs, size));
133   }
134 
135   /**
136    * @param cells
137    * @return <code>cells</code> packaged as a CellScanner
138    */
139   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
140     return new SizedCellScanner() {
141       private final Iterator<List<? extends Cell>> entries = cells.iterator();
142       private Iterator<? extends Cell> currentIterator = null;
143       private Cell currentCell;
144 
145       @Override
146       public Cell current() {
147         return this.currentCell;
148       }
149 
150       @Override
151       public boolean advance() {
152         if (this.currentIterator == null) {
153           if (!this.entries.hasNext()) return false;
154           this.currentIterator = this.entries.next().iterator();
155         }
156         if (this.currentIterator.hasNext()) {
157           this.currentCell = this.currentIterator.next();
158           return true;
159         }
160         this.currentCell = null;
161         this.currentIterator = null;
162         return advance();
163       }
164 
165       @Override
166       public long heapSize() {
167         return size;
168       }
169     };
170   }
171 }