001    // =================================================================================================
002    // Copyright 2011 Twitter, Inc.
003    // -------------------------------------------------------------------------------------------------
004    // Licensed under the Apache License, Version 2.0 (the "License");
005    // you may not use this work except in compliance with the License.
006    // You may obtain a copy of the License in the LICENSE file, or at:
007    //
008    //  http://www.apache.org/licenses/LICENSE-2.0
009    //
010    // Unless required by applicable law or agreed to in writing, software
011    // distributed under the License is distributed on an "AS IS" BASIS,
012    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013    // See the License for the specific language governing permissions and
014    // limitations under the License.
015    // =================================================================================================
016    
017    package com.twitter.common.zookeeper;
018    
019    import java.util.List;
020    import java.util.Set;
021    import java.util.logging.Level;
022    import java.util.logging.Logger;
023    import java.util.regex.Pattern;
024    
025    import javax.annotation.Nullable;
026    
027    import com.google.common.annotations.VisibleForTesting;
028    import com.google.common.base.Preconditions;
029    import com.google.common.base.Predicate;
030    import com.google.common.base.Supplier;
031    import com.google.common.base.Suppliers;
032    import com.google.common.collect.ImmutableList;
033    import com.google.common.collect.ImmutableSet;
034    import com.google.common.collect.Iterables;
035    
036    import org.apache.commons.lang.ArrayUtils;
037    import org.apache.commons.lang.StringUtils;
038    import org.apache.zookeeper.CreateMode;
039    import org.apache.zookeeper.KeeperException;
040    import org.apache.zookeeper.KeeperException.NoNodeException;
041    import org.apache.zookeeper.WatchedEvent;
042    import org.apache.zookeeper.Watcher;
043    import org.apache.zookeeper.Watcher.Event.EventType;
044    import org.apache.zookeeper.common.PathUtils;
045    import org.apache.zookeeper.data.ACL;
046    
047    import com.twitter.common.base.Command;
048    import com.twitter.common.base.Commands;
049    import com.twitter.common.base.ExceptionalSupplier;
050    import com.twitter.common.base.MorePreconditions;
051    import com.twitter.common.util.BackoffHelper;
052    import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
053    
054    /**
055     * This class exposes methods for joining and monitoring distributed groups.  The groups this class
056     * monitors are realized as persistent paths in ZooKeeper with ephemeral sequential child nodes for
057     * each member of a group.
058     *
059     * @author John Sirois
060     */
061    public class Group {
062      private static final Logger LOG = Logger.getLogger(Group.class.getName());
063    
064      private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
065      private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
066    
067      private Predicate<String> nodeNameFilter;
068    
069      private final ZooKeeperClient zkClient;
070      private final ImmutableList<ACL> acl;
071      private final String path;
072      private final String nodeNamePrefix;
073    
074      private final BackoffHelper backoffHelper;
075    
076      @VisibleForTesting static String normalizePath(String path) {
077        String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
078        PathUtils.validatePath(normalizedPath);
079        return normalizedPath;
080      }
081    
082      /**
083       * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
084       * duplicate slashes will be normalized.  For example, all the following paths would create a
085       * group at the normalized path /my/distributed/group:
086       * <ul>
087       *   <li>/my/distributed/group
088       *   <li>/my/distributed/group/
089       *   <li>/my/distributed//group
090       * </ul>
091       *
092       * @param zkClient the client to use for interactions with ZooKeeper
093       * @param acl the ACL to use for creating the persistent group path if it does not already exist
094       * @param path the absolute persistent path that represents this group
095       * @param nodeNamePrefix Node name prefix that denotes group membership.
096       */
097      public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String nodeNamePrefix) {
098        this.zkClient = Preconditions.checkNotNull(zkClient);
099        this.acl = ImmutableList.copyOf(acl);
100        this.path = normalizePath(Preconditions.checkNotNull(path));
101        this.nodeNamePrefix = MorePreconditions.checkNotBlank(nodeNamePrefix);
102    
103        final Pattern groupNodeNamePattern = Pattern.compile(
104            "^" + Pattern.quote(nodeNamePrefix) + "[0-9]+$");
105        nodeNameFilter = new Predicate<String>() {
106            @Override public boolean apply(String childNodeName) {
107              return groupNodeNamePattern.matcher(childNodeName).matches();
108            }
109          };
110    
111        backoffHelper = new BackoffHelper();
112      }
113    
114      /**
115       * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a default
116       * {@code nodeNamePrefix} of 'member_'.
117       */
118      public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
119        this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
120      }
121    
122      public String getMemberPath(String memberId) {
123        return path + "/" + MorePreconditions.checkNotBlank(memberId);
124      }
125    
126      public String getMemberId(String nodePath) {
127        MorePreconditions.checkNotBlank(nodePath);
128        Preconditions.checkArgument(nodePath.startsWith(path + "/"),
129            "Not a member of this group[%s]: %s", path, nodePath);
130        return extractMemberId(nodePath);
131      }
132    
133      private String extractMemberId(String nodePath) {
134        String memberId = StringUtils.substringAfterLast(nodePath, "/");
135        Preconditions.checkArgument(nodeNameFilter.apply(memberId), "Not a group member: %s", memberId);
136        return memberId;
137      }
138    
139      /**
140       * Returns the current list of group member ids by querying ZooKeeper synchronously.
141       *
142       * @return the ids of all the present members of this group
143       * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
144       * @throws KeeperException if there was a problem reading this group's member ids
145       * @throws InterruptedException if this thread is interrupted listing the group members
146       */
147      public Iterable<String> getMemberIds()
148          throws ZooKeeperConnectionException, KeeperException, InterruptedException {
149        return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
150      }
151    
152      /**
153       * Gets the data for one of this groups members by querying ZooKeeper synchronously.
154       *
155       * @param memberId the id of the member whose data to retrieve
156       * @return the data associated with the {@code memberId}
157       * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
158       * @throws KeeperException if there was a problem reading this member's data
159       * @throws InterruptedException if this thread is interrupted retrieving the member data
160       */
161      public byte[] getMemberData(String memberId)
162          throws ZooKeeperConnectionException, KeeperException, InterruptedException {
163        return zkClient.get().getData(getMemberPath(memberId), false, null);
164      }
165    
166      /**
167       * Represents membership in a distributed group.
168       */
169      public interface Membership {
170    
171        /**
172         * Returns the persistent ZooKeeper path that represents this group.
173         */
174        String getGroupPath();
175    
176        /**
177         * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
178         * ZooKeeper session expires.
179         */
180        String getMemberId();
181    
182        /**
183         * Returns the full ZooKeeper path to this group member.  May change over time if the
184         * ZooKeeper session expires.
185         */
186        String getMemberPath();
187    
188        /**
189         * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
190         * {@link Group#join()}.
191         *
192         * @return the new membership data
193         * @throws UpdateException if there was a problem updating the membership data
194         */
195        byte[] updateMemberData() throws UpdateException;
196    
197        /**
198         * Cancels group membership by deleting the associated ZooKeeper member node.
199         *
200         * @throws JoinException if there is a problem deleting the node
201         */
202        void cancel() throws JoinException;
203      }
204    
205      /**
206       * Indicates an error joining a group.
207       */
208      public static class JoinException extends Exception {
209        public JoinException(String message, Throwable cause) {
210          super(message, cause);
211        }
212      }
213    
214      /**
215       * Indicates an error updating a group member's data.
216       */
217      public static class UpdateException extends Exception {
218        public UpdateException(String message, Throwable cause) {
219          super(message, cause);
220        }
221      }
222    
223      /**
224       * Equivalent to calling {@code join(null, null)}.
225       */
226      public final Membership join() throws JoinException, InterruptedException {
227        return join(NO_MEMBER_DATA, null);
228      }
229    
230      /**
231       * Equivalent to calling {@code join(memberData, null)}.
232       */
233      public final Membership join(Supplier<byte[]> memberData)
234          throws JoinException, InterruptedException {
235    
236        return join(memberData, null);
237      }
238    
239      /**
240       * Equivalent to calling {@code join(null, onLoseMembership)}.
241       */
242      public final Membership join(@Nullable final Command onLoseMembership)
243          throws JoinException, InterruptedException {
244    
245        return join(NO_MEMBER_DATA, onLoseMembership);
246      }
247    
248      /**
249       * Joins this group and returns the resulting Membership when successful.  Membership will be
250       * automatically cancelled when the current jvm process dies; however the returned Membership
251       * object can be used to cancel membership earlier.  Unless
252       * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will
253       * be maintained by re-establishing it silently in the background.
254       *
255       * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
256       * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
257       * membership in the group.
258       *
259       * @param memberData a supplier of the data to store in the member node
260       * @param onLoseMembership a callback to notify when membership is lost
261       * @return a Membership object with the member details
262       * @throws JoinException if there was a problem joining the group
263       * @throws InterruptedException if this thread is interrupted awaiting completion of the join
264       */
265      public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
266          throws JoinException, InterruptedException {
267    
268        Preconditions.checkNotNull(memberData);
269        ensurePersistentGroupPath();
270    
271        final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
272        return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() {
273          @Override public Membership get() throws JoinException {
274            try {
275              return groupJoiner.join();
276            } catch (InterruptedException e) {
277              Thread.currentThread().interrupt();
278              throw new JoinException("Interrupted trying to join group at path: " + path, e);
279            } catch (ZooKeeperConnectionException e) {
280              LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
281              return null;
282            } catch (KeeperException e) {
283              if (zkClient.shouldRetry(e)) {
284                LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e);
285                return null;
286              } else {
287                throw new JoinException("Problem joining partition group at path: " + path, e);
288              }
289            }
290          }
291        });
292      }
293    
294      private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
295        backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
296          @Override public Boolean get() throws JoinException {
297            try {
298              ZooKeeperUtils.ensurePath(zkClient, acl, path);
299              return true;
300            } catch (InterruptedException e) {
301              Thread.currentThread().interrupt();
302              throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
303            } catch (ZooKeeperConnectionException e) {
304              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
305              return false;
306            } catch (KeeperException e) {
307              if (zkClient.shouldRetry(e)) {
308                LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e);
309                return false;
310              } else {
311                throw new JoinException("Problem ensuring group at path: " + path, e);
312              }
313            }
314          }
315        });
316      }
317    
318      private class ActiveMembership implements Membership {
319        private final Supplier<byte[]> memberData;
320        private final Command onLoseMembership;
321        private String nodePath;
322        private String memberId;
323        private volatile boolean cancelled;
324        private byte[] membershipData;
325    
326        public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
327          this.memberData = memberData;
328          this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
329        }
330    
331        @Override
332        public String getGroupPath() {
333          return path;
334        }
335    
336        @Override
337        public synchronized String getMemberId() {
338          return memberId;
339        }
340    
341        @Override
342        public synchronized String getMemberPath() {
343          return nodePath;
344        }
345    
346        @Override
347        public synchronized byte[] updateMemberData() throws UpdateException {
348          byte[] membershipData = memberData.get();
349          if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
350            try {
351              zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
352              this.membershipData = membershipData;
353            } catch (KeeperException e) {
354              throw new UpdateException("Problem updating membership data.", e);
355            } catch (InterruptedException e) {
356              throw new UpdateException("Interrupted attempting to update membership data.", e);
357            } catch (ZooKeeperConnectionException e) {
358              throw new UpdateException(
359                  "Could not connect to the ZooKeeper cluster to update membership data.", e);
360            }
361          }
362          return membershipData;
363        }
364    
365        @Override
366        public synchronized void cancel() throws JoinException {
367          if (!cancelled) {
368            try {
369              backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() {
370                @Override public Boolean get() throws JoinException {
371                  try {
372                    zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
373                    return true;
374                  } catch (InterruptedException e) {
375                    Thread.currentThread().interrupt();
376                    throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
377                  } catch (ZooKeeperConnectionException e) {
378                    LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
379                    return false;
380                  } catch (NoNodeException e) {
381                    LOG.info("Membership already cancelled, node at path: " + nodePath +
382                             " has been deleted");
383                    return true;
384                  } catch (KeeperException e) {
385                    if (zkClient.shouldRetry(e)) {
386                      LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e);
387                      return false;
388                    } else {
389                      throw new JoinException("Problem cancelling membership: " + nodePath, e);
390                    }
391                  }
392                }
393              });
394              cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
395            } catch (InterruptedException e) {
396              Thread.currentThread().interrupt();
397              throw new JoinException("Problem cancelling membership: " + nodePath, e);
398            }
399          }
400        }
401    
402        private class CancelledException extends IllegalStateException { /* marker */ }
403    
404        synchronized Membership join()
405            throws ZooKeeperConnectionException, InterruptedException, KeeperException {
406    
407          if (cancelled) {
408            throw new CancelledException();
409          }
410    
411          if (nodePath == null) {
412            // Re-join if our ephemeral node goes away due to session expiry - only needs to be
413            // registered once.
414            zkClient.registerExpirationHandler(new Command() {
415              @Override public void execute() {
416                tryJoin();
417              }
418            });
419          }
420    
421          byte[] membershipData = memberData.get();
422          nodePath = zkClient.get().create(path + "/" + nodeNamePrefix, membershipData, acl,
423              CreateMode.EPHEMERAL_SEQUENTIAL);
424          memberId = extractMemberId(nodePath);
425          LOG.info("Set group member ID to " + memberId);
426          this.membershipData = membershipData;
427    
428          // Re-join if our ephemeral node goes away due to maliciousness.
429          zkClient.get().exists(nodePath, new Watcher() {
430            @Override public void process(WatchedEvent event) {
431              if (event.getType() == EventType.NodeDeleted) {
432                LOG.info("Member ID deleted. Rejoining. Event: " + event);
433                tryJoin();
434              }
435            }
436          });
437    
438          return this;
439        }
440    
441        private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
442            new ExceptionalSupplier<Boolean, InterruptedException>() {
443              @Override public Boolean get() throws InterruptedException {
444                try {
445                  join();
446                  return true;
447                } catch (CancelledException e) {
448                  // Lost a cancel race - that's ok.
449                  return true;
450                } catch (ZooKeeperConnectionException e) {
451                  LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
452                  return false;
453                } catch (KeeperException e) {
454                  if (zkClient.shouldRetry(e)) {
455                    LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e);
456                    return false;
457                  } else {
458                    throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
459                  }
460                }
461              }
462            };
463    
464        private synchronized void tryJoin() {
465          onLoseMembership.execute();
466          try {
467            backoffHelper.doUntilSuccess(tryJoin);
468          } catch (InterruptedException e) {
469            Thread.currentThread().interrupt();
470            throw new RuntimeException(
471                String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
472          }
473        }
474      }
475    
476      /**
477       * An interface to an object that listens for changes to a group's membership.
478       */
479      public interface GroupChangeListener {
480    
481        /**
482         * Called whenever group membership changes with the new list of member ids.
483         *
484         * @param memberIds the current member ids
485         */
486        void onGroupChange(Iterable<String> memberIds);
487      }
488    
489      /**
490       * Indicates an error watching a group.
491       */
492      public static class WatchException extends Exception {
493        public WatchException(String message, Throwable cause) {
494          super(message, cause);
495        }
496      }
497    
498      /**
499       * Watches this group for the lifetime of this jvm process.  This method will block until the
500       * current group members are available, notify the {@code groupChangeListener} and then return.
501       * All further changes to the group membership will cause notifications on a background thread.
502       *
503       * @param groupChangeListener the listener to notify of group membership change events
504       * @throws WatchException if there is a problem generating the 1st group membership list
505       * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
506       */
507      public final void watch(final GroupChangeListener groupChangeListener)
508          throws WatchException, InterruptedException {
509        Preconditions.checkNotNull(groupChangeListener);
510    
511        try {
512          ensurePersistentGroupPath();
513        } catch (JoinException e) {
514          throw new WatchException("Failed to create group path: " + path, e);
515        }
516    
517        final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
518        backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() {
519          @Override public Boolean get() throws WatchException {
520            try {
521              groupMonitor.watchGroup();
522              return true;
523            } catch (InterruptedException e) {
524              Thread.currentThread().interrupt();
525              throw new WatchException("Interrupted trying to watch group at path: " + path, e);
526            } catch (ZooKeeperConnectionException e) {
527              LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
528              return null;
529            } catch (KeeperException e) {
530              if (zkClient.shouldRetry(e)) {
531                LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e);
532                return null;
533              } else {
534                throw new WatchException("Problem trying to watch group at path: " + path, e);
535              }
536            }
537          }
538        });
539      }
540    
541      /**
542       * Helps continuously monitor a group for membership changes.
543       */
544      private class GroupMonitor {
545        private final GroupChangeListener groupChangeListener;
546        private Set<String> members;
547    
548        GroupMonitor(GroupChangeListener groupChangeListener) {
549          this.groupChangeListener = groupChangeListener;
550        }
551    
552        private final Watcher groupWatcher = new Watcher() {
553          @Override public final void process(WatchedEvent event) {
554            if (event.getType() == EventType.NodeChildrenChanged) {
555              tryWatchGroup();
556            }
557          }
558        };
559    
560        private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
561            new ExceptionalSupplier<Boolean, InterruptedException>() {
562              @Override public Boolean get() throws InterruptedException {
563                try {
564                  watchGroup();
565                  return true;
566                } catch (ZooKeeperConnectionException e) {
567                  LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e);
568                  return false;
569                } catch (KeeperException e) {
570                  if (zkClient.shouldRetry(e)) {
571                    LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e);
572                    return false;
573                  } else {
574                    throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
575                  }
576                }
577              }
578            };
579    
580        private void tryWatchGroup() {
581          try {
582            backoffHelper.doUntilSuccess(tryWatchGroup);
583          } catch (InterruptedException e) {
584            Thread.currentThread().interrupt();
585            throw new RuntimeException(
586                String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
587          }
588        }
589    
590        private void watchGroup()
591            throws ZooKeeperConnectionException, InterruptedException, KeeperException {
592    
593          List<String> children = zkClient.get().getChildren(path, groupWatcher);
594          setMembers(Iterables.filter(children, nodeNameFilter));
595        }
596    
597        synchronized void setMembers(Iterable<String> members) {
598          if (this.members == null) {
599            // Reset our watch on the group if session expires - only needs to be registered once.
600            zkClient.registerExpirationHandler(new Command() {
601              @Override public void execute() {
602                tryWatchGroup();
603              }
604            });
605          }
606    
607          Set<String> membership = ImmutableSet.copyOf(members);
608          if (!membership.equals(this.members)) {
609            groupChangeListener.onGroupChange(members);
610            this.members = membership;
611          }
612        }
613      }
614    
615      @Override
616      public String toString() {
617        return "Group " + path;
618      }
619    }