博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Reactor 项目核心库
阅读量:6988 次
发布时间:2019-06-27

本文共 5603 字,大约阅读时间需要 18 分钟。

Reactor Core

Non-Blocking Foundation for the JVM both implementing a inspired API and efficient event streaming support.

Getting it

Reactor 3 requires Java 8 or + to run.

With Gradle from repo.spring.io or Maven Central repositories (stable releases only):

repositories {//      maven { url 'http://repo.spring.io/snapshot' }      maven { url 'http://repo.spring.io/milestone' }      mavenCentral()    }    dependencies {      //compile "io.projectreactor:reactor-core:3.1.4.RELEASE"      //testCompile("io.projectreactor:reactor-test:3.1.4.RELEASE")      compile "io.projectreactor:reactor-core:3.2.0.M1"      testCompile("io.projectreactor:reactor-test:3.2.0.M1")    }

See the

for more information on getting it (eg. using Maven, or on how to get milestones and snapshots).

Note about Android support: Reactor 3 doesn't officially support nor target Android.

However it should work fine with Android SDK 26 (Android O) and above. See the
in the reference guide.

Getting Started

New to Reactive Programming or bored of reading already ? Try the !

If you are familiar with RxJava or if you want to check more detailled introduction, be sure to check

!

Flux

A Reactive Streams Publisher with basic flow operators.

  • Static factories on Flux allow for source generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Flux#subscribe(), Flux#subscribe() or multicasting operations such as Flux#publish and Flux#publishNext.

Flux in action :

Flux.fromIterable(getSomeLongList())    .mergeWith(Flux.interval(100))    .doOnNext(serviceA::someObserver)    .map(d -> d * 2)    .take(3)    .onErrorResumeWith(errorHandler::fallback)    .doAfterTerminate(serviceM::incrementTerminate)    .subscribe(System.out::println);

Mono

A Reactive Streams Publisher constrained to ZERO or ONE element with appropriate operators.

  • Static factories on Mono allow for deterministic zero or one sequence generation from arbitrary callbacks types.
  • Instance methods allows operational building, materialized on each Mono#subscribe() or Mono#get() eventually called.

Mono in action :

Mono.fromCallable(System::currentTimeMillis)    .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))    .timeout(Duration.ofSeconds(3), errorHandler::fallback)    .doOnSuccess(r -> serviceM.incrementSuccess())    .subscribe(System.out::println);

Blocking Mono result :

Tuple2
nowAndLater = Mono.zip( Mono.just(System.currentTimeMillis()), Flux.just(1).delay(1).map(i -> System.currentTimeMillis())) .block();

Schedulers

Reactor uses a as a

contract for arbitrary task execution. It provides some guarantees required by Reactive
Streams flows like FIFO execution.

You can use or create efficient

to jump thread on the producing flows (subscribeOn) or receiving flows (publishOn):

Mono.fromCallable( () -> System.currentTimeMillis() )    .repeat()    .publishOn(Schedulers.single())    .log("foo.bar")    .flatMap(time ->        Mono.fromCallable(() -> { Thread.sleep(1000); return time; })            .subscribeOn(Schedulers.parallel())    , 8) //maxConcurrency 8    .subscribe();

ParallelFlux

can starve your CPU's from any sequence whose work can be subdivided in concurrent

tasks. Turn back into a Flux with ParallelFlux#sequential(), an unordered join or
use abitrary merge strategies via 'groups()'.

Mono.fromCallable( () -> System.currentTimeMillis() )    .repeat()    .parallel(8) //parallelism    .runOn(Schedulers.parallel())    .doOnNext( d -> System.out.println("I'm on thread "+Thread.currentThread()) )    .subscribe()

Custom sources : Flux.create and FluxSink, Mono.create and MonoSink

To bridge a Subscriber or Processor into an outside context that is taking care of

producing non concurrently, use Flux#create, Mono#create.

Flux.create(sink -> {         ActionListener al = e -> {            sink.next(textField.getText());         };         // without cancellation support:         button.addActionListener(al);         // with cancellation support:         sink.onCancel(() -> {            button.removeListener(al);         });    },    // Overflow (backpressure) handling, default is BUFFER    FluxSink.OverflowStrategy.LATEST)    .timeout(3)    .doOnComplete(() -> System.out.println("completed!"))    .subscribe(System.out::println)

The Backpressure Thing

Most of this cool stuff uses bounded ring buffer implementation under the hood to mitigate signal processing difference between producers and consumers. Now, the operators and processors or any standard reactive stream component working on the sequence will be instructed to flow in when these buffers have free room AND only then. This means that we make sure we both have a deterministic capacity model (bounded buffer) and we never block (request more data on write capacity). Yup, it's not rocket science after all, the boring part is already being worked by us in collaboration with on going research effort.

What's more in it ?

"Operator Fusion" (flow optimizers), health state observers, helpers to build custom reactive components, bounded queue generator, hash-wheel timer, converters from/to Java 9 Flow, Publisher and Java 8 CompletableFuture. The repository contains a reactor-test project with test features like the .


Reference Guide

Javadoc

Getting started with Flux and Mono

Reactor By Example

Head-First Spring & Reactor

Beyond Reactor Core

  • Everything to jump outside the JVM with the non-blocking drivers from .
  • provide for adapters and extra operators for Reactor 3.

Powered by

Licensed under

Sponsored by

转载地址:http://upzvl.baihongyu.com/

你可能感兴趣的文章
破解bmob云模糊查询收费 微信小程序端
查看>>
mysql索引使用经验总结
查看>>
【浅度渣文】Jackson之jackson-core
查看>>
吴恩达机器学习系列15:学习曲线
查看>>
记录 iView 的表单验证
查看>>
你可能并没有真正理解for-in
查看>>
block初窥
查看>>
扁平化图标的终极设计指南
查看>>
深度学习之线性回归模型
查看>>
【数据库】Redis集群篇
查看>>
[Basic] ASCII,Unicode 和 UTF-8
查看>>
数据结构 八大排序算法的时间复杂度 稳定性
查看>>
纯CSS 写动画背景,高仿蚂蚁庄园小鸡仔
查看>>
ES6快速入门(一)
查看>>
零基础该如何学习UI设计
查看>>
【云安全】阿里云云安全助理工程师认证(ACA)课程
查看>>
UIWebView中Objective C和JavaScript通信
查看>>
指弹赞美 技术服务支持
查看>>
领酌酒业:白酒行业真的那么暴利?
查看>>
拆掉城墙 苹果终于宣布要对外公布AI研究成果
查看>>