加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

Reactor

发布时间:2020-12-15 05:24:38 所属栏目:百科 来源:网络整理
导读:Reactor is a foundation for asynchronous applications on the JVM. It provides abstractions for Java,Groovy and other JVM languages to make building event and data-driven applications easier. It’s also really fast. On modest hardware,it’s


Reactoris a foundation for asynchronous applications on the JVM. It provides abstractions for Java,Groovy and other JVM languages to make building event and data-driven applications easier. It’s also really fast. On modest hardware,it’s possible to process around 15,000,000 events per second with the fastest non-blockingDispatcher. Other dispatchers are available to provide the developer with a range of choices from thread-pool style,long-running task execution to non-blocking,high-volume task dispatching.

Build Instructions

Reactoruses a Gradle-based build system. Building the code yourself should be a straightforward case of:

git clone git@github.com:reactor/reactor.git
cd reactor
./gradlew test

This should cause the submodules to be compiled and the tests to be run. To install these artifacts to your local Maven repo,use the handly Gradle Maven plugin:

./gradlew install
Installation

Maven artifacts are available in the SpringSourcesnapshotandmilestonerepositories. To use Reactor in your own project,add these repositories to your build file’s repository definitions. For a Gradle project,this would be:

ext {
  reactorVersion = '1.1.0.RELEASE'
}
repositories {
  maven { url 'http://repo.springsource.org/libs-release' }
}
dependencies {
  // Reactor Core
  compile "org.projectreactor:reactor-core:$reactorVersion"
  // Reactor Groovy
  compile "org.projectreactor:reactor-groovy:$reactorVersion"
  // Reactor Spring
  compile "org.projectreactor.spring:reactor-spring-context:$reactorVersion"
}

Overview

Reactoris fundamentally event-driven andreactive. It provides abstractions to facilitate publishing and consuming events on Reactors. AReactorcan be backed by a number of differentDispatcherimplementations so that tasks assigned to respond to certain events can be configured to be run on a single thread as in an Actor or purely Reactor pattern,on a thread chosen from a thread pool,or queued in anLMAX Disruptor RingBuffer. WhichDispatcherimplementation needed is dependent on the kind of work being done: blocking IO operations should be performed in a pooled thread and fast,non-blocking computations should probably be executed on aRingBuffer.

Getting Started

EachReactoryou create needs aDispatcherto execute tasks. By default,with no configuration,you’ll get a synchronous Dispatcher. This works fine for testing but is probably not what you want for a real application.

Since it’s not desirable to create too many threads in an asynchronous application andsomethinghas to keep track of those few Dispatchers that are divvyed out to the components that need them,you need to instaniate anEnvironmentwhich will create those Dispatchers based on either the default configuration (provided in a properties file in the Reactor JAR file) or by providing your own configuration.

The Environment

There’s no magic to it. You simply "new" up an instance ofEnvironmentand,when creating Reactors,Streams,and Promises,pass a reference to this Environment into the Specs (essentially a "builder" helper class). The Environment instance is where thereactor.system properties live and it’s also the place where the small number of Dispatchers that are intended to be used by any component in your application that needs one reside.

You can,of course,create Dispatchers directly in your code. There may be times—like embedding in other threading models—where that’s desirable. But in general,you should refrain from directly instantiating your own Dispatchers and instead use those configured to be created by theEnvironment.

Events,Selectors and Consumers

Three of the most foundational components in Reactor’sreactor-coremodule are theSelector,theConsumer,and theEvent. AConsumercan be assigned to aReactorby using aConsumers to invoke for anEvent. A range of default selectors are available. From plainStrings to regular expressions to Spring MVC-style URL templates.

Selector Matching

There are different kinds of Selectors for doing different kinds of matching. The simplest form is just to match one object with another. For example,aSelectorcreated from aString"parse" will match anotherSelectorwhose wrapped object is also aString"parse" (in this case it’s just like aString.equals(String).

But aSelectorcan also match anotherSelectorbased onClass.isAssignableFrom(Class<?>),regular expressions,URL templates,or the like. There are helper methods on theSelectorsabstract class to make creating theseSelectors very easy in user code.

Here’s is an example of wiring aConsumerto aSelectoron aReactor:

// This helper method is like jQuery’s.
// It just creates a Selector instance so you don’t have to "new" one up.
import static reactor.event.selector.Selectors.$;
Environment env = new Environment();
// This factory call creates a Reactor.
Reactor reactor = Reactors.reactor()
  .env(env) // our current Environment
  .dispatcher(Environment.EVENT_LOOP) // use one of the BlockingQueueDispatchers
  .get(); // get the object when finished configuring
// Register a consumer to handle events sent with a key that matches "parse"
reactor.on($("parse"),new Consumer<Event<String>>() {
  @Override
  public void accept(Event<String> ev) {
    System.out.println("Received event with data: " + ev.getData());
  }
});
// Send an event to this Reactor and trigger all actions that match the given key
reactor.notify("parse",Event.wrap("data"));

In Java 8,the event wiring would become extremely succinct:

// Use a POJO as an event handler
class Service {
  public <T> void handleEvent(Event<T> ev) {
    // handle the event data
  }
}
@Inject
Service service;
// Use a method reference to create a Consumer<Event<T>>
reactor.on($("parse"),service::handleEvent);
// Notify consumers of the 'parse' topic that data is ready
// by passing a Supplier<Event<T>> in the form of a lambda
reactor.notify("parse",() -> {
  slurpNextEvent()
});
Event Headers

Events have optional associated metadata in theheadersproperty. Events are meant to be stateless helpers that provide a consumer with an argument value and related metadata. If you need to communicate information to consumer components,like the IDs of other Reactors that have an interest in the outcome of the work done on thisEvent,then set that information in a header value.

// Just use the default selector instead of a specific one
r.on(new Consumer<Event<String>>() {
    public void accept(Event<String> ev) {
      String otherData = ev.getHeaders().get("x-custom-header");
      // do something with this other data
    }
});
Event<String> ev = Event.wrap("Hello World!");
ev.getHeaders().set("x-custom-header","ID_TO_ANOTHER_REACTOR");
r.notify(ev);
Registrations

When assigning anReactor,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">Registrationis provided to the caller to manage that assignment.Registrations can be cancelled,which removes them from theReactoror,if you don’t want to remove an consumer entirely but just want to pause its execution for a time,you can acceptpause()and laterresume()which will cause theDispatcherto skip over thatConsumerwhen findingConsumers that match a givenSelector.

Registration reg = r.on($("test"),new Consumer<Event<?>>() { … });
// pause this consumer so it's not executed for a time
reg.pause();
// later decide to resume it
reg.resume();
Routing

Reactor includes five different kinds of routing for assigned consumers: broadcast,random,round-robin,first,and last. This means that,of the givenConsumers assigned to the same Usage Guide

Creating an Environment

AnEnvironmentis aware of active "profiles" (the default profile is named "default"). The default environment properties reader will look for properties files in the classpath at locationMETA-INF/reactor/default.propertiesand,if the system propertyreactor.profiles.activeis set to a comma-separated list of profile names,the properties reader will look for aMETA-INF/reactor/$PROFILE.propertiesfile for each profile referenced.

TheEnvironmentmaintains references to the Dispatchers created from the profile’s properties file. In the default configuration,only oneEnvironmentinstance is needed per JVM to keep the number of threads used by the Dispatchers relatively small.

final Environment env = new Environment();

Pass a reference to this Environment when creating components that require a Dispatcher.

// Reactors are created using a ReactorSpec obtained via factory method
Reactor r = Reactors.reactor().env(env).get();


Handling Events

To assign a handler for a specificSelectoris required to do the matching.

reactor.on($("topic"),new Consumer<Event<Message>>() { ... });
// if you don't like the $,use the `object()` method
reactor.on(Selectors.object("topic"),new Consumer<Event<Message>>() { ... });

To notify the Reactor of an available Event,call thenotify()method:

Message msg = msgService.nextMessage();
reactor.notify("topic",Event.wrap(msg));

Several types of built-in Selectors are available to match by Object,Class,RegEx,or URI template.

It’s common in Reactor application to publish errors by the exception class. To handle errors this way,use theClassSelector.

// T() is a static helper function on the Selectors object to create a ClassSelector
reactor.on(T(IllegalArgumentException.class),new Consumer<Event<IllegalArgumentException>>() { ... });

Errors will be published using the exception class type as the key:

try {
  // do something that might generate an exception
} catch (Throwable t) {
  reactor.notify(t.getClass(),Event.wrap(t));
}

Topic selection can be done with regular expressions.

// R() is a static helper function on the Selectors object to create a RegexSelector
reactor.on(R("topic.([a-z0-9]+)"),new Consumer<Event<Message>>() { ... });
// ...or...
reactor.on(Selectors.regex("topic.([a-z0-9]+)"),new Consumer<Event<Message>>() { ... });

URI template selection can be done with URI templates.

// U() is a static helper method on the Selectors object to create a UriTemplateSelector
reactor.on(U("/topic/{name}"),new Consumer<Event<Message>>() {
  public void accept(Event<Message> ev) {
    String name = ev.getHeaders().get("name");
  }
})
// ...or...
reactor.on(Selectors.uri("/topic/{name}"),new Consumer<Event<Message>>() { ... });
Event Routing

Reactors send events to Consumers based on theEventRouterin use. By default,no Consumers are filtered out of the selection,which means all Consumers subscribed to a given Selector will be invoked. There are a couple built-in filters provide round-robin and random filtering of Consumers so that if multiple Consumers are subscribed to the same Selector,they will not all be invoked (as is the case with the default PassThroughFilter) but a single Consumer will be selected in a round-robin Selector. Similarly,using aRandomFilterwill randomly select a Consumer.

The Reactor Environment

Before you can do anything useful with Reactor,you need to create a long-livedEnvironmentinstance which holds the configured Dispatchers and properties that influence how Reactor components behave. Each instance of an Environment will contain its own set of Dispatchers,so be careful how many instances of these you create. If possible,only one instance per JVM is recommended.

Create a new Environment using the default properties file configuration reader,which reads the configuration fromMETA-INF/reactor/default.properties:

Environment env = new Environment();

When you create Reactors,Promises,and other components that require a Dispatcher,pass this Environment instance to the Spec for that component. For example,to reference this Environment when creating Reactors:

Reactor r = Reactors.reactor()
  .env(env)
  .get();
Dispatchers

There are a default set of Dispatchers created and they should cover the majority of use cases. But it might be the case that you don’t need a particular type of Dispatcher at all (say you’re only using theRingBufferDispatcher) so you want to prevent that type of Dispatcher from being configured. You’d simply make a copy of the Reactordefault.propertiesand set the.typeof the Dispatcher you want to turn off to a blank.

By default,a ThreadPoolExecutorDispatcher is configured.

reactor.dispatchers.threadPoolExecutor.type = threadPoolExecutor
reactor.dispatchers.threadPoolExecutor.size = 0
# Backlog is how many Task objects to warm up internally
reactor.dispatchers.threadPoolExecutor.backlog = 1024

If you don’t need this Dispatcher,then set the propertyreactor.dispatchers.threadPoolExecutor.typeto a blank:

reactor.dispatchers.threadPoolExecutor.type =
Profiles

If you don’t want to change the default properties you can activate a "profile". To activate a profile named "testing",create a properties file calledMETA-INF/reactor/testing.properties. When you run your Reactor application,activate the profile by referencing it in thereactor.profiles.activeSystem property:

$ java -Dreactor.profiles.active=testing -jar reactor-app-1.0.jar

If you don’t want Reactor’s default properties to be read at all,create a .properties file with the configuration you want and set the system propertyreactor.profiles.defaultto the name of the profile you want to load as the "default" profile. For example,to use the "production" profile as the default,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">META-INF/reactor/production.properties. When running your Reactor application,set thereactor.profiles.defaultSystem property to "production":

$ java -Dreactor.profiles.default=production -jar reactor-app-1.0.jar
The Default Dispatcher

When creating components that need Dispatchers,unless you specify a particular Dispatcher to use,Reactor will give your component the "default" Dispatcher. In its default configuration,that would be the "ringBuffer" Dispatcher. To change that,set the System propertyreactor.dispatchers.defaultto the name of the Dispatcher to be considered the default:

# change to the eventLoop for a default Dispatcher
reactor.dispatchers.default = eventLoop

Tuples

AConsumeronly has one argument in it’saccept(T ob)method. But it’s often convenient to pass multiple arguments to an Consumer. Tuples are a convenience class provided by Reactor to help. Tuples are type-safe and can be used to replace custom beans with strongly-typed properties. They are similar to Scala’sProduct and Tupleclasses.

For example,pass aStringand aFloatto an event Consumer,create aTuple2<String,Float>.

 
   
Tuple2<String,Float> tuple = Tuple.of("Hello World!",Float.valueOf(1.0));
reactor.notify("topic",Event.wrap(tuple));

Streams

AStreamis a stateless event processor that offers a reactive alternative to callback spaghetti programming. Values passing through a Stream can be transformed,filtered,and consumed by calling the appropriate method. Entire graphs of operations can be constructed from a Stream since many methods transparently return new Streams which start new flows of reactions.

Deferred and Composables

Streams aren’t created directly since aStreamis a consumer-side concern. The producer side is called aDeferred. TheDeferredis the object that ties the producer to the consumer and it’s this object that data is passed into. ADeferredis also aConsumer,so to pass data into aStream,call theaccept(T)method.

Environment env = new Environment();
// Create a deferred for accepting stream values
Deferred<String,Stream<String>> deferred = Streams.<String>defer()
  .env(env)
  .dispatcher(RING_BUFFER)
  .get();
Stream<String> stream = deferred.compose();
// consume values
stream.consume(new Consumer<String>() {
  public void accept(String s) {
    // handle string when available
  }
});
// producer calls accept
deferred.accept("Hello World!");

Generally the two halves of the Deferred/Stream combination are not in the same scope. If you’re creating an API for your application,you would likely create aDeferredinside a method call and retain a reference to that Deferred so you can publish values to it asynchronously. You would then return aStream<T>to the caller so it could interact with the values.

Composition Methods

The real power of the Stream API lies in the composition functions for transforming and filtering data.

Stream<String> filtered = stream
  .map(new Function<String,String>() {
    public String apply(String s) {
      // turn input String into output String
      return s.toLowerCase();
    }
  })
  .filter(new Predicate<String>() {
    public boolean test(String s) {
      // test String
      return s.startsWith("nsprefix:");
    }
  });

ThefilteredStream now contains only lowercased values that start with "nsprefix:".

Unbounded or Batched

Streams are by default unbounded,which means they have no beginning and no end. Streams also have a method calledbatch(int size)which creates a newStreamthat works with values in batches. In batch mode,Streammethods likefirst()andlast()make it easy to work with the beginning and ends of batches.

Thelast()methods provide Streams whose values are only the first of the Stream,in the case of an unbounded Stream,or the first value of each batch,if the Stream is in batch mode. The Stream returned fromlast()contains values from the end of a Stream (in unbounded mode,that means whenenverflush()is called) or the last values in each batch if the Stream is in batch mode.

Values can be collected into aList<T>by callingStream.collect(). This new Stream is populated by accumulated values wheneverflush()is called or the configured batch size is reached.

Reduce

Streams can also be reduced. Provided with aSupplier<T>which returns a new instance of the accumulator at the beginning of the Stream or the beginning of each batch. ATuple2is passed into the providedFunctioncontaining the current accumulator value and the next data element passing through the Stream. The value returned from the providedFunctionwill be sent along again as the accumulator value the next time a data value passes through the Stream.

Promises

APromiseis a stateful event processor that accepts either a single value or an error. Once either of these values has been accepted,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">Promiseis considered complete.

When aPromiseis populated with a value,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onSuccessConsumers are invoked. When populated with an error,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onErrorConsumers are invoked. When either a success or an error is set,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">onCompleteConsumers are invoked.

Usage

Promises aren’t created directly. They’re meant to handle future values when they are available. Promises only provide methods for setting Consumers—you can’t publish a value to a Promise. Instead you use aDeferred,which produces aPromisefrom itscompose()method. The Deferred has anaccept()method on it which is used to populate the Promise.

Promises also need to be aware of aDispatcherto use when publishing events. There are static helper methods on thePromisesclass for creating them.

// Create a deferred for accepting future values
Deferred<String,Promise<String>> deferred = Promises.<String>defer()
  .env(env)
  .dispatcher(RING_BUFFER)
  .get();
// Set a Consumer for the value
deferred.compose().onSuccess(new Consumer<String>() {
  public void accept(String s) {
    // handle string when available
  }
});
// When a value is available,set it on the Deferred
deferred.accept("Hello World!");

Promises can also be created in a success or error state right away if the value is already known. This is useful when a Promise should be returned from a method but the value (or error) is immediately available.

// Create a completed Promise
Promise<String> p = Promises.success("Hello World!").get();
// Consumers assigned on a complete Promise will be submitted immediately
p.onSuccess(new Consumer<String>() {
  public void accept(String s) {
    // handle string when available
  }
});
// Create a Promise in an error state
Promise<String> p = Promises.<String>error(new IllegalStateException()).get();
Composition

A key feature of Promises is that they are composable. Themap(Function<T,V> fn)function can be used to assign aFunction<T,V>that will transform the result valueTinto aVand populate a newPromise<V>with the return value.

Deferred<String,Promise<String>> deferred = Promises.<String>defer().get();
// Transform the String into a Float using map()
Promise<Float> p2 = p1.map(new Function<String,Float>() {
  public Float apply(String s) {
    return Float.parseFloat(s);
  }
});
p2.onSuccess(new Consumer<Float>() {
  public void accept(Float f) {
    // handle transformed value
  }
});
deferred.accept("12.2");

A Promise can also be filtered. A call to thefilter(Predicate<T> p)method will return a newPromise<T>that will,depending on the Predicate check,either be populated with the result value or be in error if the Predicate test fails.

Deferred<Float,Promise<Float>> deferred = Promises.<Float>defer().get();
// Filter based on a Predicate
Promise<Float> p2 = p1.filter(new Predicate<Float>() {
  public boolean test(Float f) {
    return f > 100f;
  }
});
p2.then(
  // onSuccess
  new Consumer<Float>() {
    public void accept(Float f) {
      // handle value
    }
  },// onError
  new Consumer<Throwable>() {
    public void accept(Throwable t) {
      // predicate test failed
    }
  }
);
deferred.accept("12.2");
Getting the value

Since Promises are designed to be stateful,they also provide methods for getting the value of the Promise. Promises areSuppliers,so a call to theget()method will return the current value of the Promise,whether it is complete or not (Promises also provide methods for checking whether the Promise is complete,a success,or in error).

Promises can block the calling thread waiting for a result. Call one of theawait()methods to block until a value is available or the Promise is notified of an error.

Note that the zero-argawait()method doesnotblock indefinitely. It will only block for a period of milliseconds as defined in the System propertyreactor.await.defaultTimeout. This value is 30 seconds by default. It can be changed by setting this property in theMETA-INF/reactor/default.properties(or any other activeprofileName.properties).

Processor (LMAX Disruptor) Support

TheProcessoris a thin abstraction around theLMAX Disruptor RingBufferdesigned for high performance. Unlike aProcessorhas no awareness of dispatching or Selectors and doesn’t directly support dynamic Consumer assignment (that can be achieved by using theDelegatingConsumer). The primary goal of theProcessoris to expose the power of the Disruptor RingBuffer as closely to the core as possible,without introducing unnecessary overhead.

Creation of aProcessorhappens through aProcessorSpec:

/**
 * Frame object for use inside a RingBuffer.
 */
public class Frame {
  int type;
  Buffer message;
}

// Create a Processor for handling Frames
Processor<Frame> processor = new ProcessorSpec<Frame>()
  .dataSupplier(new Supplier<Frame>() {
    public Frame get() {
      return new Frame();
    }
  })
  .consume(new Consumer<Frame>() {
    public void accept(Frame frame) {

      // handle each updated Frame of data
      switch(frame.type) {
        case 0:
          // handle error frame
          break;
        case 1:
          // handle response frame
          break;
        default:
          break;
      }

    }
  })
  .get();

// Producer prepares operations and updates Frame data
for(Event<Message> evt : events) {
  Message msg = evt.getData();

  Operation<Frame> op = processor.prepare();
  Frame f = op.get();

  f.type = msg.getType();
  f.message = msg.getBuffer();

  // Consumer is invoked on Operation.commit(),which is RingBuffer.publish()
  op.commit();
}
Batching

TheProcessorcan also handle operations in batches of any size (though we’ve found that smaller batches generally offer higher throughput) using thebatch(int size,Consumer<Frame> mutator)method. Here’s an example of creating a batch the same size as theList<Message> eventsof incoming messages:

processor.batch(events.size(),new Consumer<Frame>() {
  ListIterator<Message> msgs = events.listIterator();

  public void accept(Frame frame) {
    Message msg = msgs.next();

    f.type = msg.getType();
    f.message = msg.getBuffer();
  }
});

There’s no need to callcommit()for each operation in batch mode. TheConsumer<T>passed into the batch method is a mutator whose purpose is to set the values of the data object before it is implicitly committed (published).

Spring Support

Reactor provides out-of-the-box support for Spring ApplicationContexts by providing aBeanPostProcessorimplementation that finds annotated beans and wires them into Reactors using SpEL and also provides some helperFactoryBeanimplementations for creating ReactorEnvironmentandReactorinstances.

@EnableReactor

Since creating the initialEnvironmentis a standard part of using Reactor in any application,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">reactor-springprovides a JavaConfig annotation that you put on your@Configurationbean to implicitly create anEnvironmentbased on the default properties file bootstrapping mechanism. You don’t have to create anEnvironmentbean explicitly if you use@EnableReactor.

Using the@EnableReactorannotation also configures theBeanPostProcessorto post-process your beans that have methods annotated with the@Selectorannotation. Here’s an example of a POJO bean definition that uses annotations to consume events published to aReactorbean defined in the sameApplicationContext:

/**
 * HandlerBean.java
 */
@Consumer
public class HandlerBean {

  @Selector(value="test.topic",reactor"@rootReactor")
  public void handleTestTopic(Event<String> evt) {
    // handle the event
  }

}

 * ReactorConfig.java
 */
@Configuration
@EnableReactor
@ComponentScan
ReactorConfig {

  @Bean
  public Reactor rootReactor(Environment env) {
    // implicit Environment is injected into bean def method
    return Reactors.reactor().env(env).get();
  }

}

Any other components who also have the sameReactorinjected into them can publish events to it,while the POJO handler beans can handle the events.

@Service
TestService {

  @Autowired
  private Reactor rootReactor;

  fireEvent(String s) {
    rootReactor.notify(Event.wrap(s));
  }

}
Request/Reply with Annotated Handlers

If you’re using annotated handler beans as Consumers using the@Selectorannotation,your method can also serve as a request/reply handler by returning a value. To tell the BeanPostProcessor where to send the return value,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">@ReplyTo("topic")annotation on your handler method.

@Consumer
@Autowired
  @Qualifier("rootReactor")
  private Reactor reactor;

  @Selector("test.topic")
  @ReplyTo("reply.topic")
  public String evt) {
    return "Hello World!";
  }

}

AsyncTaskExecutor


Reactor contains a couple implementations of Spring’sAsyncTaskExecutorinterface [1]. The goal of these components is to provide a scalable and predictable executor for arbitraryRunnabletasks that can scale to enormously high loads beyond what a traditionalThreadPoolExecutoris capable of handling. Using theseTaskExecutorimplementations in preference to the standard Spring TaskExecutors will yield significantly higher overall throughput at high loads,but maybe more importantly,a predictable and small heap usage as task scale from thousands to millions.

Usage

To use a single-threadedRingBufferAsyncTaskExecutorand get the highest possible throughput,set up a bean in your configuration:

@Configuration
@EnableReactor
AppConfig {

  public AsyncTaskExecutor singleThreadAsyncTaskExecutor(Environment env) {
    return new RingBufferAsyncTaskExecutor(env)
        .setName("ringBufferExecutor")
        .setBacklog(2048)
        .setProducerType(ProducerType.SINGLE)
        .setWaitStrategy(new YieldingWaitStrategy());
  }

}

Every component that uses this sharedTaskExecutorwill dispatch tasks to the exact same thread in the internalRingBuffer. Using theProducerType.SINGLEensures the highest possible throughput but requires the component that is submitting tasks to be single-threaded. If that’s not the case and tasks are being dispatched into this executor from multiple threads,then leave these configuration options off and take the default,which isProducerType.MULTI.

Multi-threaded RingBuffer Usage

To configure aThreadPoolExecutorreplacement that uses multiple threads,the configuration of theWorkQueueAsyncTaskExecutoris identical to the above with the exception of being able to specify the number of threads to be used as event handlers.

workQueueAsyncTaskExecutor(Environment new WorkQueueAsyncTaskExecutor(env)
        .setName("workQueueExecutor")
        .setBacklog(2048)
        .setThreads(4)
        .setProducerType(ProducerTypenew YieldingWaitStrategy());
  }

}

By default,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">WorkQueueAsyncTaskExecutorwill be configured with a thread count equal to the number of processors in the machine.

TcpClient

Reactor includes a powerful but easy-to-use TCP client that allows you to asynchronously communicate with TCP servers. TheCodecsupport is common between theTcpClientandTcpServer,so whatever codecs work for the server will also work for the client. The TcpClient also includes support for heartbeat during connection idle times and intelligent reconnect in case of a connection being dropped.

Connecting to a Server

Like with other Reactor components,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">TcpClientisn’t created directly but is obtained through configuration of aTcpClientSpec. The only implementation included with Reactor by default is a Netty 4 implementation of TcpClient but it’s important to note that other implementations could be created.

Create aTcpClientSpec,configure it with the appropriate codec and handler,and connect to the server:

Environment env = new Environment();

// create a spec using the Netty-based client
TcpClient<String,String> client = new TcpClientSpec<String,String>(NettyTcpClient.class)
    .env(env)
    .codec(StandardCodecs.LINE_FEED_CODEC)
    .connect("localhost",8080)
    .get();

client.open().consume(new Consumer<TcpConnection<String,String>>() {
  public void accept(TcpConnection<String,String> conn) {

    conn.in().consume(new Consumer<String>() {
      public void accept(String line) {
        // handle lines of incoming data
      }
    });

    // write data to the server
    conn.send("Hello World!");

  }
});
Smart Reconnect

The zero-argopen()method returns aPromise<TcpConnection<IN,OUT>>which will be fulfilled when the connection is made. APromiseis only fulfilled once,so the handler that will accept this connection will only ever be invoked the first time a connection is made using this client. If the transaction is short-lived and the network is reliable,that should be sufficient. But if the connections are long-lived (granted,a term which is open to interpretation) there is a good chance that the connection will be dropped for some reason.

Reactor’s TcpClient offers a smart reconnect option that allows you to specify whether or not,or how long to wait to attempt a reconnect and it’s even possible to change the host to which a reconnect attempt is made if your reconnect algorithm involves switching among a set of hosts to which you want to stay connected in a high-availability situation.

To use the reconnect functionality,you need to pass an implementation of theReconnectinterface when you open a connection:

Environment env = new Environment();

TcpClient<String,8080)
    .get();

Stream<TcpConnection<String,String>> connections = client.open(new Reconnect() {
  public Tuple2<InetSocketAddress,Long> reconnect(InetSocketAddress currentAddress,int attempt) {

    // try reconnecting 3 times
    switch (attempt) {
      case 1:
        return Tuple.of(currentAddress,100L);
      case 2:
        return Tuple.of(currentAddress,500L);
      case 3:
        return Tuple.of(currentAddress,1000L);
      default:
        // try to connect somewhere else
        return Tuple.of(nextHost(currentAddress),0L);
    }

  }
});

connections.consume(new Consumer<TcpConnection<String,String> conn) {
    // handle each connection,including reconnects
  }
});
Heartbeats

The Netty client has an idle timout ChannelListener which is exposed in Reactor via theConsumerSpecreturned fromTcpClient.on(). TheConsumerSpecis a simple way to wire handlers to events that a component might emit throughout its lifetime. The TcpClientConsumerSpecexposes three events:close,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">readIdle,andwriteIdle.

To attach a handler to that event,call the appropriate method (using the Stream created in the example above):

connections.consume(new Consumer<TcpConnection<String,String>>() {
  public void accept(final TcpConnection<String,String> conn) {

    // handle each connection,including reconnects
    conn.on()
      .close(new Runnable() {
        public void run() {
          // handle connection close event
        }
      })
      .readIdle(5000,new Runnable() {
        public void run() {
          // no data read for 5 seconds,ping client
          conn.send("ping");
        }
      })
      .writeIdle(5000,new Runnable() {
        public void run() {
          // no data written for 5 seconds,ping client
          conn.send("ping");
        }
      })

  }
});

TcpServer

Reactor includes powerful abstractions for high-speed data ingestion via TCP. TheTcpServercomes with a default implementation based on Netty 4 but is extensible and other implementations could be plugged in. As with most components in Reactor,monospace; font-size:13.6000003814697px; padding:0.2em 0px; margin:0px">TcpServerisn’t instantiated directly. Instead,a standardTcpServerSpecis created and configured and the spec provides user code with the configured server,ready to start.

To create aTcpServerSpec<IN,OUT>whose generic signature matches the type of object you’re working with in Consumers. If you’re not doing anything with the data,a default pass-throughCodecwill be used. The standardLineFeedCodecwill turn incoming data into a String very efficiently and split it up based on a newline character. This is probably the easiest way to get started working with Reactor’s TCP support.

Environment env = new Environment();

// create a spec using the Netty-based server
TcpServer<String,String> server = new TcpServerSpec<String,String>(NettyTcpServer.class)
  .env(env)
  .codec(StandardCodecs.LINE_FEED_CODEC)
  .consume(new Consumer<NetChannel<String,String>>() {
    public void accept(NetChannel<String,String> conn) {

      // for each connection,process incoming data
      conn.in().consume(new Consumer<String>() {
        public void accept(String line) {
          // handle line feed data
        }
      });

    }
  })
  .get();

server.start();
Codec Support

Reactor’sTcpServerandTcpClientsupport aCodecto transform data from and to rawBuffers. There are no special Encoder and Decoder interfaces to implement. The Codec functionality simply provides a way to hand off instances ofFunction<IN,OUT>to the server.

A good example Codec is theStringCodecwhich creates a newFunctionfor each connection because it internally uses aCharsetDecoderandCharsetEncoderfor fast and efficient conversion of byte data toString. Here’s the entire implementation:

package reactor.tcp.encoding;

import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class StringCodec implements Codec<Buffer,String,String> {

  private final Charset utf8 = Charset.forName("UTF-8");

  @Override
  public Function<Buffer,String> decoder(Consumer<String> next) {
    return new StringDecoder(next);
  }

  @Override
  public Function<String,Buffer> encoder() {
    return new StringEncoder();
  }

  private class StringDecoder implements Function<Buffer,String> {
    private final Consumer<String> next;
    private final CharsetDecoder decoder = utf8.newDecoder();

    private StringDecoder(Consumer<String> next) {
      this.next = next;
    }

    @Override
    public String apply(Buffer bytes) {
      try {
        String s = decoder.decode(bytes.byteBuffer()).toString();
        if (null != next) {
          next.accept(s);
          return null;
        } else {
          return s;
        }
      } catch (CharacterCodingException e) {
        throw new IllegalStateException(e);
      }
    }
  }

  private class StringEncoder implements Function<String,Buffer> {
    private final CharsetEncoder encoder = utf8.newEncoder();

    @Override
    public Buffer apply(String s) {
      try {
        ByteBuffer bb = encoder.encode(CharBuffer.wrap(s));
        return new Buffer(bb);
      } catch (CharacterCodingException e) {
        throw new IllegalStateException(e);
      }
    }
  }

}

Whether theCodecreturns a new instance ofFunction<IN,OUT>from thedecoder(Consumer<T> next)andencoder()methods is domain-specific. In cases where transcoding is stateless,it’s safe to create the encoders and decoders as instance—or even static—members of theCodecimplementation.

Request/Reply

Although Reactor’s TCP support is a strong fit for high-volume ingest,you can use theTcpServeras a general TCP server. When ready to communicate asynchronously with the client,call theNetChannel.send(OUT)method.

TcpServer<String,String>()
  .env(env)
  .codec(StandardCodecs.LINE_FEED_CODEC)
  .consume(new Consumer<NetChannel<String,String>>() {
    public void accept(final NetChannel<String,process incoming data
      conn.in().consume(new Consumer<String>() {
        public void accept(String line) {
          // handle line feed data

          // respond to client (newline will be added by codec)
          conn.send("Hello World!");
        }
      });

    }
  })
  .get()
  .start();

To obtain aConsumer<OUT>to pass to other components that expect a Consumer,call theNetChannel.out()method.

Consumer<NetChannel<String,String>> consumer = new Consumer<NetChannel<String,String>>() {
  public void accept(final NetChannel<String,String> conn) {

    final Consumer<String> out = conn.out();

    // result will be accepted into Consumer and passed to client
    service.doLongLookup(out);

  }
};

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读