001    // =================================================================================================
002    // Copyright 2012 Twitter, Inc.
003    // -------------------------------------------------------------------------------------------------
004    // Licensed under the Apache License, Version 2.0 (the "License");
005    // you may not use this work except in compliance with the License.
006    // You may obtain a copy of the License in the LICENSE file, or at:
007    //
008    //  http://www.apache.org/licenses/LICENSE-2.0
009    //
010    // Unless required by applicable law or agreed to in writing, software
011    // distributed under the License is distributed on an "AS IS" BASIS,
012    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013    // See the License for the specific language governing permissions and
014    // limitations under the License.
015    // =================================================================================================
016    
017    package com.twitter.common.zookeeper.guice;
018    
019    import java.lang.annotation.Retention;
020    import java.lang.annotation.Target;
021    import java.net.InetSocketAddress;
022    import java.util.Map;
023    import java.util.concurrent.atomic.AtomicReference;
024    import java.util.logging.Level;
025    import java.util.logging.Logger;
026    
027    import static java.lang.annotation.ElementType.FIELD;
028    import static java.lang.annotation.ElementType.METHOD;
029    import static java.lang.annotation.ElementType.PARAMETER;
030    import static java.lang.annotation.RetentionPolicy.RUNTIME;
031    
032    import javax.annotation.Nullable;
033    
034    import com.google.common.base.Optional;
035    import com.google.common.base.Preconditions;
036    import com.google.common.util.concurrent.Atomics;
037    import com.google.inject.BindingAnnotation;
038    import com.google.inject.AbstractModule;
039    import com.google.inject.Inject;
040    import com.google.inject.Provides;
041    import com.google.inject.Singleton;
042    import com.google.inject.TypeLiteral;
043    
044    import com.twitter.common.application.ShutdownRegistry;
045    import com.twitter.common.application.modules.LifecycleModule;
046    import com.twitter.common.application.modules.LocalServiceRegistry;
047    import com.twitter.common.args.Arg;
048    import com.twitter.common.args.CmdLine;
049    import com.twitter.common.args.constraints.NotEmpty;
050    import com.twitter.common.args.constraints.NotNull;
051    import com.twitter.common.base.Command;
052    import com.twitter.common.base.ExceptionalCommand;
053    import com.twitter.common.base.Supplier;
054    import com.twitter.common.zookeeper.Group.JoinException;
055    import com.twitter.common.zookeeper.ServerSet;
056    import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
057    import com.twitter.common.zookeeper.ServerSet.UpdateException;
058    import com.twitter.common.zookeeper.ServerSetImpl;
059    import com.twitter.common.zookeeper.ZooKeeperClient;
060    import com.twitter.thrift.Status;
061    
062    import static com.google.common.base.Preconditions.checkNotNull;
063    
064    /**
065     * A module that registers all ports in the {@link LocalServiceRegistry} in an {@link ServerSet}.
066     *
067     * Required bindings:
068     * <ul>
069     *   <li> {@link ZooKeeperClient}
070     *   <li> {@link ShutdownRegistry}
071     *   <li> {@link LocalServiceRegistry}
072     * </ul>
073     *
074     * {@link LifecycleModule} must also be included by users so a startup action may be registered.
075     *
076     * Provided bindings:
077     * <ul>
078     *   <li> {@link Supplier<EndpointStatus>}
079     * </ul>
080     *
081     * @author William Farner
082     */
083    public class ServerSetModule extends AbstractModule {
084    
085      /**
086       * BindingAnnotation for defaults to use in the service instance node.
087       */
088      @BindingAnnotation @Target({ PARAMETER, METHOD, FIELD }) @Retention(RUNTIME)
089      private @interface Default { }
090    
091      @NotNull
092      @NotEmpty
093      @CmdLine(name = "serverset_path", help = "ServerSet registration path")
094      protected static final Arg<String> SERVERSET_PATH = Arg.create(null);
095    
096      @CmdLine(name = "aux_port_as_primary",
097          help = "Name of the auxiliary port to use as the primary port in the server set."
098              + " This may only be used when no other primary port is specified.")
099      private static final Arg<String> AUX_PORT_AS_PRIMARY = Arg.create(null);
100    
101      private static final Logger LOG = Logger.getLogger(ServerSetModule.class.getName());
102    
103      private final Status initialStatus;
104      private final Optional<String> auxPortAsPrimary;
105    
106      /**
107       * Calls {@link #ServerSetModule(Optional)} with an absent value.
108       */
109      public ServerSetModule() {
110        this(Optional.<String>absent());
111      }
112    
113      /**
114       * Calls {@link #ServerSetModule(Status, Optional)} with initial status {@link Status#ALIVE}.
115       *
116       * @param auxPortAsPrimary Name of the auxiliary port to use as the primary port.
117       */
118      public ServerSetModule(Optional<String> auxPortAsPrimary) {
119        this(Status.ALIVE, auxPortAsPrimary);
120      }
121    
122      /**
123       * Constructs a ServerSetModule that registers a startup action that registers this process in
124       * ZooKeeper, with the specified initial Status.
125       *
126       * @param initialStatus initial Status to report to ZooKeeper.
127       */
128      public ServerSetModule(Status initialStatus) {
129        this(initialStatus, Optional.<String>absent());
130      }
131    
132      /**
133       * Constructs a ServerSetModule that registers a startup action to register this process in
134       * ZooKeeper, with the specified initial status and auxiliary port to represent as the primary
135       * service port.
136       *
137       * @param initialStatus initial Status to report to ZooKeeper.
138       * @param auxPortAsPrimary Name of the auxiliary port to use as the primary port.
139       */
140      public ServerSetModule(Status initialStatus, Optional<String> auxPortAsPrimary) {
141        this.initialStatus = Preconditions.checkNotNull(initialStatus);
142        this.auxPortAsPrimary = Preconditions.checkNotNull(auxPortAsPrimary);
143      }
144    
145      @Override
146      protected void configure() {
147        requireBinding(ZooKeeperClient.class);
148        requireBinding(ShutdownRegistry.class);
149        requireBinding(LocalServiceRegistry.class);
150        LifecycleModule.bindStartupAction(binder(), ServerSetJoiner.class);
151    
152        bind(new TypeLiteral<Supplier<EndpointStatus>>() { }).to(EndpointSupplier.class);
153        bind(EndpointSupplier.class).in(Singleton.class);
154        bind(Status.class).annotatedWith(Default.class).toInstance(initialStatus);
155    
156        Optional<String> primaryPortName;
157        if (AUX_PORT_AS_PRIMARY.hasAppliedValue()) {
158          primaryPortName = Optional.of(AUX_PORT_AS_PRIMARY.get());
159        } else {
160          primaryPortName = auxPortAsPrimary;
161        }
162    
163        bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(Default.class)
164            .toInstance(primaryPortName);
165      }
166    
167      @Provides
168      @Singleton
169      ServerSet provideServerSet(ZooKeeperClient zkClient) {
170        return new ServerSetImpl(zkClient, SERVERSET_PATH.get());
171      }
172    
173      static class EndpointSupplier implements Supplier<EndpointStatus> {
174        private final AtomicReference<EndpointStatus> reference = Atomics.newReference();
175    
176        @Nullable
177        @Override public EndpointStatus get() {
178          return reference.get();
179        }
180    
181        void set(EndpointStatus endpoint) {
182          reference.set(endpoint);
183        }
184      }
185    
186      private static class ServerSetJoiner implements Command {
187        private final ServerSet serverSet;
188        private final LocalServiceRegistry serviceRegistry;
189        private final ShutdownRegistry shutdownRegistry;
190        private final EndpointSupplier endpointSupplier;
191        private final Status initialStatus;
192        private final Optional<String> auxPortAsPrimary;
193    
194        @Inject
195        ServerSetJoiner(
196            ServerSet serverSet,
197            LocalServiceRegistry serviceRegistry,
198            ShutdownRegistry shutdownRegistry,
199            EndpointSupplier endpointSupplier,
200            @Default Status initialStatus,
201            @Default Optional<String> auxPortAsPrimary) {
202    
203          this.serverSet = checkNotNull(serverSet);
204          this.serviceRegistry = checkNotNull(serviceRegistry);
205          this.shutdownRegistry = checkNotNull(shutdownRegistry);
206          this.endpointSupplier = checkNotNull(endpointSupplier);
207          this.initialStatus = checkNotNull(initialStatus);
208          this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary);
209        }
210    
211        @Override public void execute() {
212          Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
213          Map<String, InetSocketAddress> auxSockets = serviceRegistry.getAuxiliarySockets();
214    
215          InetSocketAddress primary;
216          if (primarySocket.isPresent()) {
217            primary = primarySocket.get();
218          } else if (auxPortAsPrimary.isPresent()) {
219            primary = auxSockets.get(auxPortAsPrimary.get());
220            if (primary == null) {
221              throw new IllegalStateException("No auxiliary port named " + auxPortAsPrimary.get());
222            }
223          } else {
224            throw new IllegalStateException("No primary service registered with LocalServiceRegistry,"
225                + " and -aux_port_as_primary was not specified.");
226          }
227    
228          final EndpointStatus endpointStatus;
229          try {
230            endpointStatus = serverSet.join(primary, auxSockets, initialStatus);
231            endpointSupplier.set(endpointStatus);
232          } catch (JoinException e) {
233            LOG.log(Level.WARNING, "Failed to join ServerSet.", e);
234            throw new RuntimeException(e);
235          } catch (InterruptedException e) {
236            LOG.log(Level.WARNING, "Interrupted while joining ServerSet.", e);
237            Thread.currentThread().interrupt();
238            throw new RuntimeException(e);
239          }
240    
241          shutdownRegistry.addAction(new ExceptionalCommand<UpdateException>() {
242            @Override public void execute() throws UpdateException {
243              LOG.info("Leaving ServerSet.");
244              endpointStatus.update(Status.DEAD);
245            }
246          });
247        }
248      }
249    }