Reactor is a foundation for asynchronous applications on the JVM. It provides abstractions for Java,Groovy and other JVM languages to make building event and data-driven applications easier. It’s also really fast. On modest hardware,it’s possible to process around 15,000,000 events per second with the fastest non-blockingDispatcher . Other dispatchers are available to provide the developer with a range of choices from thread-pool style,long-running task execution to non-blocking,high-volume task dispatching.
Build Instructions
Reactor uses a Gradle-based build system. Building the code yourself should be a straightforward case of:
git clone git@github.com:reactor/reactor.git
cd reactor
./gradlew test
This should cause the submodules to be compiled and the tests to be run. To install these artifacts to your local Maven repo,use the handly Gradle Maven plugin:
Installation
Maven artifacts are available in the SpringSourcesnapshotandmilestonerepositories. To use Reactor in your own project,add these repositories to your build file’s repository definitions. For a Gradle project,this would be:
ext {
reactorVersion = '1.1.0.RELEASE'
}
repositories {
maven { url 'http://repo.springsource.org/libs-release' }
}
dependencies {
// Reactor Core
compile "org.projectreactor:reactor-core:$reactorVersion"
// Reactor Groovy
compile "org.projectreactor:reactor-groovy:$reactorVersion"
// Reactor Spring
compile "org.projectreactor.spring:reactor-spring-context:$reactorVersion"
}
Overview
Reactor is fundamentally event-driven andreactive. It provides abstractions to facilitate publishing and consuming events on Reactors. AReactor can be backed by a number of differentDispatcher implementations so that tasks assigned to respond to certain events can be configured to be run on a single thread as in an Actor or purely Reactor pattern,on a thread chosen from a thread pool,or queued in anLMAX Disruptor RingBuffer. WhichDispatcher implementation needed is dependent on the kind of work being done: blocking IO operations should be performed in a pooled thread and fast,non-blocking computations should probably be executed on aRingBuffer .
Getting Started
EachReactor you create needs aDispatcher to execute tasks. By default,with no configuration,you’ll get a synchronous Dispatcher. This works fine for testing but is probably not what you want for a real application.
Since it’s not desirable to create too many threads in an asynchronous application andsomethinghas to keep track of those few Dispatchers that are divvyed out to the components that need them,you need to instaniate anEnvironment which will create those Dispatchers based on either the default configuration (provided in a properties file in the Reactor JAR file) or by providing your own configuration.
The Environment
There’s no magic to it. You simply "new" up an instance ofEnvironment and,when creating Reactors,Streams,and Promises,pass a reference to this Environment into the Specs (essentially a "builder" helper class). The Environment instance is where thereactor. system properties live and it’s also the place where the small number of Dispatchers that are intended to be used by any component in your application that needs one reside.
You can,of course,create Dispatchers directly in your code. There may be times—like embedding in other threading models—where that’s desirable. But in general,you should refrain from directly instantiating your own Dispatchers and instead use those configured to be created by theEnvironment .
Events,Selectors and Consumers
Three of the most foundational components in Reactor’sreactor-core module are theSelector ,theConsumer ,and theEvent . AConsumer can be assigned to aReactor by using aConsumer s to invoke for anEvent . A range of default selectors are available. From plainString s to regular expressions to Spring MVC-style URL templates.
Selector Matching
There are different kinds of Selectors for doing different kinds of matching. The simplest form is just to match one object with another. For example,aSelector created from aString "parse" will match anotherSelector whose wrapped object is also aString "parse" (in this case it’s just like aString.equals(String) .
But aSelector can also match anotherSelector based onClass.isAssignableFrom(Class<?>) ,regular expressions,URL templates,or the like. There are helper methods on theSelectors abstract class to make creating theseSelector s very easy in user code.
Here’s is an example of wiring aConsumer to aSelector on aReactor :
// This helper method is like jQuery’s.
// It just creates a Selector instance so you don’t have to "new" one up.
import static reactor.event.selector.Selectors.$;
Environment env = new Environment();
// This factory call creates a Reactor.
Reactor reactor = Reactors.reactor()
.env(env) // our current Environment
.dispatcher(Environment.EVENT_LOOP) // use one of the BlockingQueueDispatchers
.get(); // get the object when finished configuring
// Register a consumer to handle events sent with a key that matches "parse"
reactor.on($("parse"),new Consumer<Event<String>>() {
@Override
public void accept(Event<String> ev) {
System.out.println("Received event with data: " + ev.getData());
}
});
// Send an event to this Reactor and trigger all actions that match the given key
reactor.notify("parse",Event.wrap("data"));
In Java 8,the event wiring would become extremely succinct:
// Use a POJO as an event handler
class Service {
public <T> void handleEvent(Event<T> ev) {
// handle the event data
}
}
// Use a method reference to create a Consumer<Event<T>>
reactor.on($("parse"),service::handleEvent);
// Notify consumers of the 'parse' topic that data is ready
// by passing a Supplier<Event<T>> in the form of a lambda
reactor.notify("parse",() -> {
slurpNextEvent()
});
Event Headers
Events have optional associated metadata in theheaders property. Events are meant to be stateless helpers that provide a consumer with an argument value and related metadata. If you need to communicate information to consumer components,like the IDs of other Reactors that have an interest in the outcome of the work done on thisEvent ,then set that information in a header value.
// Just use the default selector instead of a specific one
r.on(new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
String otherData = ev.getHeaders().get("x-custom-header");
// do something with this other data
}
});
Event<String> ev = Event.wrap("Hello World!");
ev.getHeaders().set("x-custom-header","ID_TO_ANOTHER_REACTOR");
r.notify(ev);
Registrations
When assigning anReactor ,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">Registrationis provided to the caller to manage that assignment.Registration s can be cancelled,which removes them from theReactor or,if you don’t want to remove an consumer entirely but just want to pause its execution for a time,you can acceptpause() and laterresume() which will cause theDispatcher to skip over thatConsumer when findingConsumer s that match a givenSelector .
Registration reg = r.on($("test"),new Consumer<Event<?>>() { … });
// pause this consumer so it's not executed for a time
reg.pause();
// later decide to resume it
reg.resume();
Routing
Reactor includes five different kinds of routing for assigned consumers: broadcast,random,round-robin,first,and last. This means that,of the givenConsumer s assigned to the same Usage Guide
Creating an Environment
AnEnvironment is aware of active "profiles" (the default profile is named "default"). The default environment properties reader will look for properties files in the classpath at locationMETA-INF/reactor/default.properties and,if the system propertyreactor.profiles.active is set to a comma-separated list of profile names,the properties reader will look for aMETA-INF/reactor/$PROFILE.properties file for each profile referenced.
TheEnvironment maintains references to the Dispatchers created from the profile’s properties file. In the default configuration,only oneEnvironment instance is needed per JVM to keep the number of threads used by the Dispatchers relatively small.
final Environment env = new Environment();
Pass a reference to this Environment when creating components that require a Dispatcher.
// Reactors are created using a ReactorSpec obtained via factory method
Reactor r = Reactors.reactor().env(env).get();
Handling Events
To assign a handler for a specificSelector is required to do the matching.
reactor.on($("topic"),new Consumer<Event<Message>>() { ... });
// if you don't like the $,use the `object()` method
reactor.on(Selectors.object("topic"),new Consumer<Event<Message>>() { ... });
To notify the Reactor of an available Event,call thenotify() method:
Message msg = msgService.nextMessage();
reactor.notify("topic",Event.wrap(msg));
Several types of built-in Selectors are available to match by Object,Class,RegEx,or URI template.
It’s common in Reactor application to publish errors by the exception class. To handle errors this way,use theClassSelector .
// T() is a static helper function on the Selectors object to create a ClassSelector
reactor.on(T(IllegalArgumentException.class),new Consumer<Event<IllegalArgumentException>>() { ... });
Errors will be published using the exception class type as the key:
try {
// do something that might generate an exception
} catch (Throwable t) {
reactor.notify(t.getClass(),Event.wrap(t));
}
Topic selection can be done with regular expressions.
// R() is a static helper function on the Selectors object to create a RegexSelector
reactor.on(R("topic.([a-z0-9]+)"),new Consumer<Event<Message>>() { ... });
// ...or...
reactor.on(Selectors.regex("topic.([a-z0-9]+)"),new Consumer<Event<Message>>() { ... });
URI template selection can be done with URI templates.
// U() is a static helper method on the Selectors object to create a UriTemplateSelector
reactor.on(U("/topic/{name}"),new Consumer<Event<Message>>() {
public void accept(Event<Message> ev) {
String name = ev.getHeaders().get("name");
}
})
// ...or...
reactor.on(Selectors.uri("/topic/{name}"),new Consumer<Event<Message>>() { ... });
Event Routing
Reactors send events to Consumers based on theEventRouter in use. By default,no Consumers are filtered out of the selection,which means all Consumers subscribed to a given Selector will be invoked. There are a couple built-in filters provide round-robin and random filtering of Consumers so that if multiple Consumers are subscribed to the same Selector,they will not all be invoked (as is the case with the default PassThroughFilter) but a single Consumer will be selected in a round-robin Selector. Similarly,using aRandomFilter will randomly select a Consumer.
The Reactor Environment
Before you can do anything useful with Reactor,you need to create a long-livedEnvironment instance which holds the configured Dispatchers and properties that influence how Reactor components behave. Each instance of an Environment will contain its own set of Dispatchers,so be careful how many instances of these you create. If possible,only one instance per JVM is recommended.
Create a new Environment using the default properties file configuration reader,which reads the configuration fromMETA-INF/reactor/default.properties :
Environment env = new Environment();
When you create Reactors,Promises,and other components that require a Dispatcher,pass this Environment instance to the Spec for that component. For example,to reference this Environment when creating Reactors:
Reactor r = Reactors.reactor()
.env(env)
.get();
Dispatchers
There are a default set of Dispatchers created and they should cover the majority of use cases. But it might be the case that you don’t need a particular type of Dispatcher at all (say you’re only using theRingBufferDispatcher ) so you want to prevent that type of Dispatcher from being configured. You’d simply make a copy of the Reactordefault.propertiesand set the.type of the Dispatcher you want to turn off to a blank.
By default,a ThreadPoolExecutorDispatcher is configured.
reactor.dispatchers.threadPoolExecutor.type = threadPoolExecutor
reactor.dispatchers.threadPoolExecutor.size = 0
# Backlog is how many Task objects to warm up internally
reactor.dispatchers.threadPoolExecutor.backlog = 1024
If you don’t need this Dispatcher,then set the propertyreactor.dispatchers.threadPoolExecutor.type to a blank:
reactor.dispatchers.threadPoolExecutor.type =
Profiles
If you don’t want to change the default properties you can activate a "profile". To activate a profile named "testing",create a properties file calledMETA-INF/reactor/testing.properties . When you run your Reactor application,activate the profile by referencing it in thereactor.profiles.active System property:
$ java -Dreactor.profiles.active=testing -jar reactor-app-1.0.jar
If you don’t want Reactor’s default properties to be read at all,create a .properties file with the configuration you want and set the system propertyreactor.profiles.default to the name of the profile you want to load as the "default" profile. For example,to use the "production" profile as the default,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">META-INF/reactor/production.properties. When running your Reactor application,set thereactor.profiles.default System property to "production":
$ java -Dreactor.profiles.default=production -jar reactor-app-1.0.jar
The Default Dispatcher
When creating components that need Dispatchers,unless you specify a particular Dispatcher to use,Reactor will give your component the "default" Dispatcher. In its default configuration,that would be the "ringBuffer" Dispatcher. To change that,set the System propertyreactor.dispatchers.default to the name of the Dispatcher to be considered the default:
# change to the eventLoop for a default Dispatcher
reactor.dispatchers.default = eventLoop
Tuples
AConsumer only has one argument in it’saccept(T ob) method. But it’s often convenient to pass multiple arguments to an Consumer. Tuples are a convenience class provided by Reactor to help. Tuples are type-safe and can be used to replace custom beans with strongly-typed properties. They are similar to Scala’sProduct and Tupleclasses.
For example,pass aString and aFloat to an event Consumer,create aTuple2<String,Float> .
Tuple2<String,Float> tuple = Tuple.of("Hello World!",Float.valueOf(1.0));
reactor.notify("topic",Event.wrap(tuple));
Streams
AStreamis a stateless event processor that offers a reactive alternative to callback spaghetti programming. Values passing through a Stream can be transformed,filtered,and consumed by calling the appropriate method. Entire graphs of operations can be constructed from a Stream since many methods transparently return new Streams which start new flows of reactions.
Deferred and Composables
Streams aren’t created directly since aStream is a consumer-side concern. The producer side is called aDeferred. TheDeferred is the object that ties the producer to the consumer and it’s this object that data is passed into. ADeferred is also aConsumer,so to pass data into aStream ,call theaccept(T)method.
Environment env = new Environment();
// Create a deferred for accepting stream values
Deferred<String,Stream<String>> deferred = Streams.<String>defer()
.env(env)
.dispatcher(RING_BUFFER)
.get();
Stream<String> stream = deferred.compose();
// consume values
stream.consume(new Consumer<String>() {
public void accept(String s) {
// handle string when available
}
});
// producer calls accept
deferred.accept("Hello World!");
Generally the two halves of the Deferred/Stream combination are not in the same scope. If you’re creating an API for your application,you would likely create aDeferred inside a method call and retain a reference to that Deferred so you can publish values to it asynchronously. You would then return aStream<T> to the caller so it could interact with the values.
Composition Methods
The real power of the Stream API lies in the composition functions for transforming and filtering data.
Stream<String> filtered = stream
.map(new Function<String,String>() {
public String apply(String s) {
// turn input String into output String
return s.toLowerCase();
}
})
.filter(new Predicate<String>() {
public boolean test(String s) {
// test String
return s.startsWith("nsprefix:");
}
});
Thefiltered Stream now contains only lowercased values that start with "nsprefix:".
Unbounded or Batched
Streams are by default unbounded,which means they have no beginning and no end. Streams also have a method calledbatch(int size) which creates a newStream that works with values in batches. In batch mode,Stream methods likefirst() andlast() make it easy to work with the beginning and ends of batches.
Thelast() methods provide Streams whose values are only the first of the Stream,in the case of an unbounded Stream,or the first value of each batch,if the Stream is in batch mode. The Stream returned fromlast() contains values from the end of a Stream (in unbounded mode,that means whenenverflush() is called) or the last values in each batch if the Stream is in batch mode.
Values can be collected into aList<T> by callingStream.collect() . This new Stream is populated by accumulated values wheneverflush() is called or the configured batch size is reached.
Reduce
Streams can also be reduced. Provided with aSupplier<T> which returns a new instance of the accumulator at the beginning of the Stream or the beginning of each batch. ATuple2 is passed into the providedFunction containing the current accumulator value and the next data element passing through the Stream. The value returned from the providedFunction will be sent along again as the accumulator value the next time a data value passes through the Stream.
Promises
APromiseis a stateful event processor that accepts either a single value or an error. Once either of these values has been accepted,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">Promiseis considered complete.
When aPromise is populated with a value,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onSuccessConsumers are invoked. When populated with an error,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onErrorConsumers are invoked. When either a success or an error is set,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onCompleteConsumers are invoked.
Usage
Promises aren’t created directly. They’re meant to handle future values when they are available. Promises only provide methods for setting Consumers—you can’t publish a value to a Promise. Instead you use aDeferred ,which produces aPromise from itscompose() method. The Deferred has anaccept() method on it which is used to populate the Promise.
Promises also need to be aware of aDispatcher to use when publishing events. There are static helper methods on thePromisesclass for creating them.
// Create a deferred for accepting future values
Deferred<String,Promise<String>> deferred = Promises.<String>defer()
.env(env)
.dispatcher(RING_BUFFER)
.get();
// Set a Consumer for the value
deferred.compose().onSuccess(new Consumer<String>() {
public void accept(String s) {
// handle string when available
}
});
// When a value is available,set it on the Deferred
deferred.accept("Hello World!");
Promises can also be created in a success or error state right away if the value is already known. This is useful when a Promise should be returned from a method but the value (or error) is immediately available.
// Create a completed Promise
Promise<String> p = Promises.success("Hello World!").get();
// Consumers assigned on a complete Promise will be submitted immediately
p.onSuccess(new Consumer<String>() {
public void accept(String s) {
// handle string when available
}
});
// Create a Promise in an error state
Promise<String> p = Promises.<String>error(new IllegalStateException()).get();
Composition
A key feature of Promises is that they are composable. Themap(Function<T,V> fn)function can be used to assign aFunction<T,V>that will transform the result valueT into aV and populate a newPromise<V> with the return value.
Deferred<String,Promise<String>> deferred = Promises.<String>defer().get();
// Transform the String into a Float using map()
Promise<Float> p2 = p1.map(new Function<String,Float>() {
public Float apply(String s) {
return Float.parseFloat(s);
}
});
p2.onSuccess(new Consumer<Float>() {
public void accept(Float f) {
// handle transformed value
}
});
A Promise can also be filtered. A call to thefilter(Predicate<T> p)method will return a newPromise<T> that will,depending on the Predicate check,either be populated with the result value or be in error if the Predicate test fails.
Deferred<Float,Promise<Float>> deferred = Promises.<Float>defer().get();
// Filter based on a Predicate
Promise<Float> p2 = p1.filter(new Predicate<Float>() {
public boolean test(Float f) {
return f > 100f;
}
});
p2.then(
// onSuccess
new Consumer<Float>() {
public void accept(Float f) {
// handle value
}
},// onError
new Consumer<Throwable>() {
public void accept(Throwable t) {
// predicate test failed
}
}
);
Getting the value
Since Promises are designed to be stateful,they also provide methods for getting the value of the Promise. Promises areSuppliers,so a call to theget() method will return the current value of the Promise,whether it is complete or not (Promises also provide methods for checking whether the Promise is complete,a success,or in error).
Promises can block the calling thread waiting for a result. Call one of theawait()methods to block until a value is available or the Promise is notified of an error.
Note that the zero-argawait()method doesnotblock indefinitely. It will only block for a period of milliseconds as defined in the System propertyreactor.await.defaultTimeout . This value is 30 seconds by default. It can be changed by setting this property in theMETA-INF/reactor/default.properties (or any other activeprofileName.properties ).
Processor (LMAX Disruptor) Support
TheProcessor is a thin abstraction around theLMAX Disruptor RingBufferdesigned for high performance. Unlike aProcessor has no awareness of dispatching or Selectors and doesn’t directly support dynamic Consumer assignment (that can be achieved by using theDelegatingConsumer ). The primary goal of theProcessor is to expose the power of the Disruptor RingBuffer as closely to the core as possible,without introducing unnecessary overhead.
Creation of aProcessor happens through aProcessorSpec :
/**
* Frame object for use inside a RingBuffer.
*/
public class Frame {
int type;
Buffer message;
}
// Create a Processor for handling Frames
Processor<Frame> processor = new ProcessorSpec<Frame>()
.dataSupplier(new Supplier<Frame>() {
public Frame get() {
return new Frame();
}
})
.consume(new Consumer<Frame>() {
public void accept(Frame frame) {
// handle each updated Frame of data
switch(frame.type) {
case 0:
// handle error frame
break;
case 1:
// handle response frame
break;
default:
break;
}
}
})
.get();
// Producer prepares operations and updates Frame data
for(Event<Message> evt : events) {
Message msg = evt.getData();
Operation<Frame> op = processor.prepare();
Frame f = op.get();
f.type = msg.getType();
f.message = msg.getBuffer();
// Consumer is invoked on Operation.commit(),which is RingBuffer.publish()
op.commit();
}
Batching
TheProcessor can also handle operations in batches of any size (though we’ve found that smaller batches generally offer higher throughput) using thebatch(int size,Consumer<Frame> mutator) method. Here’s an example of creating a batch the same size as theList<Message> events of incoming messages:
processor.batch(events.size(),new Consumer<Frame>() {
ListIterator<Message> msgs = events.listIterator();
public void accept(Frame frame) {
Message msg = msgs.next();
f.type = msg.getType();
f.message = msg.getBuffer();
}
});
There’s no need to callcommit() for each operation in batch mode. TheConsumer<T> passed into the batch method is a mutator whose purpose is to set the values of the data object before it is implicitly committed (published).
Spring Support
Reactor provides out-of-the-box support for Spring ApplicationContexts by providing aBeanPostProcessor implementation that finds annotated beans and wires them into Reactors using SpEL and also provides some helperFactoryBean implementations for creating ReactorEnvironment andReactor instances.
@EnableReactor
Since creating the initialEnvironment is a standard part of using Reactor in any application,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">reactor-springprovides a JavaConfig annotation that you put on your@Configuration bean to implicitly create anEnvironment based on the default properties file bootstrapping mechanism. You don’t have to create anEnvironment bean explicitly if you use@EnableReactor .
Using the@EnableReactor annotation also configures theBeanPostProcessor to post-process your beans that have methods annotated with the@Selector annotation. Here’s an example of a POJO bean definition that uses annotations to consume events published to aReactor bean defined in the sameApplicationContext :
/**
* HandlerBean.java
*/
@Consumer
public class HandlerBean {
@Selector(value="test.topic",reactor"@rootReactor")
public void handleTestTopic(Event<String> evt) {
// handle the event
}
}
* ReactorConfig.java
*/
@Configuration
@EnableReactor
@ComponentScan
ReactorConfig {
@Bean
public Reactor rootReactor(Environment env) {
// implicit Environment is injected into bean def method
return Reactors.reactor().env(env).get();
}
}
Any other components who also have the sameReactor injected into them can publish events to it,while the POJO handler beans can handle the events.
@Service
TestService {
@Autowired
private Reactor rootReactor;
fireEvent(String s) {
rootReactor.notify(Event.wrap(s));
}
}
Request/Reply with Annotated Handlers
If you’re using annotated handler beans as Consumers using the@Selector annotation,your method can also serve as a request/reply handler by returning a value. To tell the BeanPostProcessor where to send the return value,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">@ReplyTo("topic")annotation on your handler method.
@Consumer
@Autowired
@Qualifier("rootReactor")
private Reactor reactor;
@Selector("test.topic")
@ReplyTo("reply.topic")
public String evt) {
return "Hello World!";
}
}
Reactor contains a couple implementations of Spring’sAsyncTaskExecutor interface [1]. The goal of these components is to provide a scalable and predictable executor for arbitraryRunnable tasks that can scale to enormously high loads beyond what a traditionalThreadPoolExecutor is capable of handling. Using theseTaskExecutor implementations in preference to the standard Spring TaskExecutors will yield significantly higher overall throughput at high loads,but maybe more importantly,a predictable and small heap usage as task scale from thousands to millions.
Usage
To use a single-threadedRingBufferAsyncTaskExecutor and get the highest possible throughput,set up a bean in your configuration:
@Configuration
@EnableReactor
AppConfig {
public AsyncTaskExecutor singleThreadAsyncTaskExecutor(Environment env) {
return new RingBufferAsyncTaskExecutor(env)
.setName("ringBufferExecutor")
.setBacklog(2048)
.setProducerType(ProducerType.SINGLE)
.setWaitStrategy(new YieldingWaitStrategy());
}
}
Every component that uses this sharedTaskExecutor will dispatch tasks to the exact same thread in the internalRingBuffer . Using theProducerType.SINGLE ensures the highest possible throughput but requires the component that is submitting tasks to be single-threaded. If that’s not the case and tasks are being dispatched into this executor from multiple threads,then leave these configuration options off and take the default,which isProducerType.MULTI .
Multi-threaded RingBuffer Usage
To configure aThreadPoolExecutor replacement that uses multiple threads,the configuration of theWorkQueueAsyncTaskExecutor is identical to the above with the exception of being able to specify the number of threads to be used as event handlers.
workQueueAsyncTaskExecutor(Environment new WorkQueueAsyncTaskExecutor(env)
.setName("workQueueExecutor")
.setBacklog(2048)
.setThreads(4)
.setProducerType(ProducerTypenew YieldingWaitStrategy());
}
}
By default,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">WorkQueueAsyncTaskExecutorwill be configured with a thread count equal to the number of processors in the machine.
TcpClient
Reactor includes a powerful but easy-to-use TCP client that allows you to asynchronously communicate with TCP servers. TheCodec support is common between theTcpClient andTcpServer ,so whatever codecs work for the server will also work for the client. The TcpClient also includes support for heartbeat during connection idle times and intelligent reconnect in case of a connection being dropped.
Connecting to a Server
Like with other Reactor components,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">TcpClientisn’t created directly but is obtained through configuration of aTcpClientSpec . The only implementation included with Reactor by default is a Netty 4 implementation of TcpClient but it’s important to note that other implementations could be created.
Create aTcpClientSpec ,configure it with the appropriate codec and handler,and connect to the server:
Environment env = new Environment();
// create a spec using the Netty-based client
TcpClient<String,String> client = new TcpClientSpec<String,String>(NettyTcpClient.class)
.env(env)
.codec(StandardCodecs.LINE_FEED_CODEC)
.connect("localhost",8080)
.get();
client.open().consume(new Consumer<TcpConnection<String,String>>() {
public void accept(TcpConnection<String,String> conn) {
conn.in().consume(new Consumer<String>() {
public void accept(String line) {
// handle lines of incoming data
}
});
// write data to the server
conn.send("Hello World!");
}
});
Smart Reconnect
The zero-argopen() method returns aPromise<TcpConnection<IN,OUT>>which will be fulfilled when the connection is made. APromise is only fulfilled once,so the handler that will accept this connection will only ever be invoked the first time a connection is made using this client. If the transaction is short-lived and the network is reliable,that should be sufficient. But if the connections are long-lived (granted,a term which is open to interpretation) there is a good chance that the connection will be dropped for some reason.
Reactor’s TcpClient offers a smart reconnect option that allows you to specify whether or not,or how long to wait to attempt a reconnect and it’s even possible to change the host to which a reconnect attempt is made if your reconnect algorithm involves switching among a set of hosts to which you want to stay connected in a high-availability situation.
To use the reconnect functionality,you need to pass an implementation of theReconnect interface when you open a connection:
Environment env = new Environment();
TcpClient<String,8080)
.get();
Stream<TcpConnection<String,String>> connections = client.open(new Reconnect() {
public Tuple2<InetSocketAddress,Long> reconnect(InetSocketAddress currentAddress,int attempt) {
// try reconnecting 3 times
switch (attempt) {
case 1:
return Tuple.of(currentAddress,100L);
case 2:
return Tuple.of(currentAddress,500L);
case 3:
return Tuple.of(currentAddress,1000L);
default:
// try to connect somewhere else
return Tuple.of(nextHost(currentAddress),0L);
}
}
});
connections.consume(new Consumer<TcpConnection<String,String> conn) {
// handle each connection,including reconnects
}
});
Heartbeats
The Netty client has an idle timout ChannelListener which is exposed in Reactor via theConsumerSpec returned fromTcpClient.on() . TheConsumerSpec is a simple way to wire handlers to events that a component might emit throughout its lifetime. The TcpClientConsumerSpec exposes three events:close ,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">readIdle,andwriteIdle .
To attach a handler to that event,call the appropriate method (using the Stream created in the example above):
connections.consume(new Consumer<TcpConnection<String,String>>() {
public void accept(final TcpConnection<String,String> conn) {
// handle each connection,including reconnects
conn.on()
.close(new Runnable() {
public void run() {
// handle connection close event
}
})
.readIdle(5000,new Runnable() {
public void run() {
// no data read for 5 seconds,ping client
conn.send("ping");
}
})
.writeIdle(5000,new Runnable() {
public void run() {
// no data written for 5 seconds,ping client
conn.send("ping");
}
})
}
});
TcpServer
Reactor includes powerful abstractions for high-speed data ingestion via TCP. TheTcpServer comes with a default implementation based on Netty 4 but is extensible and other implementations could be plugged in. As with most components in Reactor,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">TcpServerisn’t instantiated directly. Instead,a standardTcpServerSpec is created and configured and the spec provides user code with the configured server,ready to start.
To create aTcpServerSpec<IN,OUT> whose generic signature matches the type of object you’re working with in Consumers. If you’re not doing anything with the data,a default pass-throughCodec will be used. The standardLineFeedCodec will turn incoming data into a String very efficiently and split it up based on a newline character. This is probably the easiest way to get started working with Reactor’s TCP support.
Environment env = new Environment();
// create a spec using the Netty-based server
TcpServer<String,String> server = new TcpServerSpec<String,String>(NettyTcpServer.class)
.env(env)
.codec(StandardCodecs.LINE_FEED_CODEC)
.consume(new Consumer<NetChannel<String,String>>() {
public void accept(NetChannel<String,String> conn) {
// for each connection,process incoming data
conn.in().consume(new Consumer<String>() {
public void accept(String line) {
// handle line feed data
}
});
}
})
.get();
server.start();
Codec Support
Reactor’sTcpServer andTcpClient support aCodecto transform data from and to rawBuffers. There are no special Encoder and Decoder interfaces to implement. The Codec functionality simply provides a way to hand off instances ofFunction<IN,OUT>to the server.
A good example Codec is theStringCodec which creates a newFunction for each connection because it internally uses aCharsetDecoder andCharsetEncoder for fast and efficient conversion of byte data toString . Here’s the entire implementation:
package reactor.tcp.encoding;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class StringCodec implements Codec<Buffer,String,String> {
private final Charset utf8 = Charset.forName("UTF-8");
@Override
public Function<Buffer,String> decoder(Consumer<String> next) {
return new StringDecoder(next);
}
@Override
public Function<String,Buffer> encoder() {
return new StringEncoder();
}
private class StringDecoder implements Function<Buffer,String> {
private final Consumer<String> next;
private final CharsetDecoder decoder = utf8.newDecoder();
private StringDecoder(Consumer<String> next) {
this.next = next;
}
@Override
public String apply(Buffer bytes) {
try {
String s = decoder.decode(bytes.byteBuffer()).toString();
if (null != next) {
next.accept(s);
return null;
} else {
return s;
}
} catch (CharacterCodingException e) {
throw new IllegalStateException(e);
}
}
}
private class StringEncoder implements Function<String,Buffer> {
private final CharsetEncoder encoder = utf8.newEncoder();
@Override
public Buffer apply(String s) {
try {
ByteBuffer bb = encoder.encode(CharBuffer.wrap(s));
return new Buffer(bb);
} catch (CharacterCodingException e) {
throw new IllegalStateException(e);
}
}
}
}
Whether theCodec returns a new instance ofFunction<IN,OUT> from thedecoder(Consumer<T> next)andencoder()methods is domain-specific. In cases where transcoding is stateless,it’s safe to create the encoders and decoders as instance—or even static—members of theCodec implementation.
Request/Reply
Although Reactor’s TCP support is a strong fit for high-volume ingest,you can use theTcpServer as a general TCP server. When ready to communicate asynchronously with the client,call theNetChannel.send(OUT)method.
TcpServer<String,String>()
.env(env)
.codec(StandardCodecs.LINE_FEED_CODEC)
.consume(new Consumer<NetChannel<String,String>>() {
public void accept(final NetChannel<String,process incoming data
conn.in().consume(new Consumer<String>() {
public void accept(String line) {
// handle line feed data
// respond to client (newline will be added by codec)
conn.send("Hello World!");
}
});
}
})
.get()
.start();
To obtain aConsumer<OUT> to pass to other components that expect a Consumer,call theNetChannel.out()method.
Consumer<NetChannel<String,String>> consumer = new Consumer<NetChannel<String,String>>() {
public void accept(final NetChannel<String,String> conn) {
final Consumer<String> out = conn.out();
// result will be accepted into Consumer and passed to client
service.doLongLookup(out);
}
};
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|