View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.protobuf;
21  
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.UUID;
30  
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.io.SizedCellScanner;
38  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
40  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.util.ByteStringer;
45  import org.apache.hadoop.hbase.util.Pair;
46  import org.apache.hadoop.hbase.wal.WAL.Entry;
47  import org.apache.hadoop.hbase.wal.WALKey;
48  
49  import com.google.protobuf.ServiceException;
50  
51  @InterfaceAudience.Private
52  public class ReplicationProtbufUtil {
53    /**
54     * A helper to replicate a list of WAL entries using admin protocol.
55     * @param admin Admin service
56     * @param entries Array of WAL entries to be replicated
57     * @param replicationClusterId Id which will uniquely identify source cluster FS client
58     *          configurations in the replication configuration directory
59     * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
60     * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
61     * @throws java.io.IOException
62     */
63    public static void replicateWALEntry(final AdminService.BlockingInterface admin,
64        final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
65        Path sourceHFileArchiveDir) throws IOException {
66      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
67          buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
68            sourceHFileArchiveDir);
69      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
70      try {
71        admin.replicateWALEntry(controller, p.getFirst());
72      } catch (ServiceException se) {
73        throw ProtobufUtil.getRemoteException(se);
74      }
75    }
76  
77    /**
78     * Create a new ReplicateWALEntryRequest from a list of WAL entries
79     *
80     * @param entries the WAL entries to be replicated
81     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
82     * found.
83     */
84    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
85        buildReplicateWALEntryRequest(final Entry[] entries) {
86      return buildReplicateWALEntryRequest(entries, null, null, null, null);
87    }
88  
89    /**
90     * Create a new ReplicateWALEntryRequest from a list of WAL entries
91     * @param entries the WAL entries to be replicated
92     * @param encodedRegionName alternative region name to use if not null
93     * @param replicationClusterId Id which will uniquely identify source cluster FS client
94     *          configurations in the replication configuration directory
95     * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
96     * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
97     * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
98     */
99    public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
100       buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
101           String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
102     // Accumulate all the Cells seen in here.
103     List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
104     int size = 0;
105     WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
106     AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
107     AdminProtos.ReplicateWALEntryRequest.Builder builder =
108       AdminProtos.ReplicateWALEntryRequest.newBuilder();
109     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
110     for (Entry entry: entries) {
111       entryBuilder.clear();
112       // TODO: this duplicates a lot in WALKey#getBuilder
113       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
114       WALKey key = entry.getKey();
115       keyBuilder.setEncodedRegionName(
116         ByteStringer.wrap(encodedRegionName == null
117             ? key.getEncodedRegionName()
118             : encodedRegionName));
119       keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
120       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
121       keyBuilder.setWriteTime(key.getWriteTime());
122       if (key.getNonce() != HConstants.NO_NONCE) {
123         keyBuilder.setNonce(key.getNonce());
124       }
125       if (key.getNonceGroup() != HConstants.NO_NONCE) {
126         keyBuilder.setNonceGroup(key.getNonceGroup());
127       }
128       for(UUID clusterId : key.getClusterIds()) {
129         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
130         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
131         keyBuilder.addClusterIds(uuidBuilder.build());
132       }
133       if(key.getOrigLogSeqNum() > 0) {
134         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
135       }
136       WALEdit edit = entry.getEdit();
137       NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
138       if (scopes != null && !scopes.isEmpty()) {
139         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
140           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
141           WALProtos.ScopeType scopeType =
142               WALProtos.ScopeType.valueOf(scope.getValue().intValue());
143           scopeBuilder.setScopeType(scopeType);
144           keyBuilder.addScopes(scopeBuilder.build());
145         }
146       }
147       List<Cell> cells = edit.getCells();
148       // Add up the size.  It is used later serializing out the kvs.
149       for (Cell cell: cells) {
150         size += CellUtil.estimatedSerializedSizeOf(cell);
151       }
152       // Collect up the cells
153       allCells.add(cells);
154       // Write out how many cells associated with this entry.
155       entryBuilder.setAssociatedCellCount(cells.size());
156       builder.addEntry(entryBuilder.build());
157     }
158 
159     if (replicationClusterId != null) {
160       builder.setReplicationClusterId(replicationClusterId);
161     }
162     if (sourceBaseNamespaceDir != null) {
163       builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
164     }
165     if (sourceHFileArchiveDir != null) {
166       builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
167     }
168 
169     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
170       getCellScanner(allCells, size));
171   }
172 
173   /**
174    * @param cells
175    * @return <code>cells</code> packaged as a CellScanner
176    */
177   static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
178     return new SizedCellScanner() {
179       private final Iterator<List<? extends Cell>> entries = cells.iterator();
180       private Iterator<? extends Cell> currentIterator = null;
181       private Cell currentCell;
182 
183       @Override
184       public Cell current() {
185         return this.currentCell;
186       }
187 
188       @Override
189       public boolean advance() {
190         if (this.currentIterator == null) {
191           if (!this.entries.hasNext()) return false;
192           this.currentIterator = this.entries.next().iterator();
193         }
194         if (this.currentIterator.hasNext()) {
195           this.currentCell = this.currentIterator.next();
196           return true;
197         }
198         this.currentCell = null;
199         this.currentIterator = null;
200         return advance();
201       }
202 
203       @Override
204       public long heapSize() {
205         return size;
206       }
207     };
208   }
209 }