001    // =================================================================================================
002    // Copyright 2011 Twitter, Inc.
003    // -------------------------------------------------------------------------------------------------
004    // Licensed under the Apache License, Version 2.0 (the "License");
005    // you may not use this work except in compliance with the License.
006    // You may obtain a copy of the License in the LICENSE file, or at:
007    //
008    //  http://www.apache.org/licenses/LICENSE-2.0
009    //
010    // Unless required by applicable law or agreed to in writing, software
011    // distributed under the License is distributed on an "AS IS" BASIS,
012    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013    // See the License for the specific language governing permissions and
014    // limitations under the License.
015    // =================================================================================================
016    
017    package com.twitter.common.zookeeper;
018    
019    import com.google.common.annotations.VisibleForTesting;
020    import com.google.common.base.Predicate;
021    import com.google.common.collect.Ordering;
022    import com.twitter.common.zookeeper.Group.GroupChangeListener;
023    import com.twitter.common.zookeeper.Group.JoinException;
024    import com.twitter.common.zookeeper.Group.Membership;
025    import com.twitter.common.zookeeper.Group.UpdateException;
026    import com.twitter.common.zookeeper.Group.WatchException;
027    import org.apache.zookeeper.data.ACL;
028    
029    import javax.annotation.Nullable;
030    import java.util.List;
031    import java.util.logging.Logger;
032    
033    /**
034     * A distributed mechanism for eventually arriving at an evenly partitioned space of long values.
035     * A typical usage would have a client on each of several hosts joining a logical partition (a
036     * "partition group") that represents some shared work.  Clients could then process a subset of a
037     * full body of work by testing any given item of work with their partition filter.
038     *
039     * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1
040     * partition as explained in {@link #join()}.
041     *
042     * @author John Sirois
043     */
044    public class Partitioner {
045    
046      private static final Logger LOG = Logger.getLogger(Partitioner.class.getName());
047    
048      private volatile int groupSize;
049      private volatile int groupIndex;
050      private final Group group;
051    
052      /**
053       * Constructs a representation of a partition group but does not join it.  Note that the partition
054       * group path will be created as a persistent zookeeper path if it does not already exist.
055       *
056       * @param zkClient a client to use for joining the partition group and watching its membership
057       * @param acl the acl for this partition group
058       * @param path a zookeeper path that represents the partition group
059       */
060      public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
061        group = new Group(zkClient, acl, path);
062      }
063    
064      @VisibleForTesting
065      int getGroupSize() {
066        return groupSize;
067      }
068    
069      /**
070       * Represents a slice of a partition group.  The partition is dynamic and will adjust its size as
071       * members join and leave its partition group.
072       */
073      public abstract static class Partition implements Predicate<Long>, Membership {
074    
075        /**
076         * Returns {@code true} if the given {@code value} is a member of this partition at this time.
077         */
078        public abstract boolean isMember(long value);
079    
080        /**
081         * Gets number of members in the group at this time.
082         *
083         * @return number of members in the ZK group at this time.
084         */
085        public abstract int getNumPartitions();
086    
087        /**
088         * Evaluates partition membership based on the given {@code value}'s hash code.  If the value
089         * is null it is never a member of a partition.
090         */
091        boolean isMember(Object value) {
092          return (value != null) && isMember(value.hashCode());
093        }
094    
095        /**
096         * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing
097         * overhead.
098         */
099        @Override
100        public boolean apply(@Nullable Long input) {
101          return (input != null) && isMember(input);
102        }
103      }
104    
105      /**
106       * Attempts to join the partition group and claim a slice.  When successful, a predicate is
107       * returned that can be used to test whether or not an item belongs to this partition.  The
108       * predicate is dynamic such that as the group is further partitioned or partitions merge the
109       * predicate will claim a narrower or wider swath of the partition space respectively.  Partition
110       * creation and merging is not instantaneous and clients should expect independent partitions to
111       * claim ownership of some items when partition membership is in flux.  It is only in the steady
112       * state that a client should expect independent partitions to divide the partition space evenly
113       * and without overlap.
114       *
115       * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation.
116       *
117       * @return the partition representing the slice of the partition group this member can claim
118       * @throws JoinException if there was a problem joining the partition group
119       * @throws InterruptedException if interrupted while waiting to join the partition group
120       */
121      public final Partition join() throws JoinException, InterruptedException {
122        final Membership membership = group.join();
123        try {
124          group.watch(createGroupChangeListener(membership));
125        } catch (WatchException e) {
126          membership.cancel();
127          throw new JoinException("Problem establishing watch on group after joining it", e);
128        }
129        return new Partition() {
130          @Override public boolean isMember(long value) {
131            return (value % groupSize) == groupIndex;
132          }
133    
134          @Override public int getNumPartitions() {
135            return groupSize;
136          }
137    
138          @Override public String getGroupPath() {
139            return membership.getGroupPath();
140          }
141    
142          @Override public String getMemberId() {
143            return membership.getMemberId();
144          }
145    
146          @Override public String getMemberPath() {
147            return membership.getMemberPath();
148          }
149    
150          @Override public byte[] updateMemberData() throws UpdateException {
151            return membership.updateMemberData();
152          }
153    
154          @Override public void cancel() throws JoinException {
155            membership.cancel();
156          }
157        };
158      }
159    
160      @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) {
161        return new GroupChangeListener() {
162          @Override public void onGroupChange(Iterable<String> memberIds) {
163            List<String> members = Ordering.natural().sortedCopy(memberIds);
164            int newSize = members.size();
165            int newIndex = members.indexOf(membership.getMemberId());
166    
167            LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
168                membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex));
169    
170            groupSize = newSize;
171            groupIndex = newIndex;
172          }
173        };
174      }
175    }