java – Vert.x事件循环 – 这是怎么异步的?
我正在使用Vert.x和相当于基于事件循环的服务器,而不是线程/连接模型.
public void start(Future<Void> fut) { vertx .createHttpServer() .requestHandler(r -> { LocalDateTime start = LocalDateTime.now(); System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME)); final MyModel model = new MyModel(); try { for(int i=0;i<10000000;i++){ //some simple operation } model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } r.response().end( new Gson().toJson(model) ); }) .listen(4568,result -> { if (result.succeeded()) { fut.complete(); } else { fut.fail(result.cause()); } }); System.out.println("Server started .."); } >我只是想模拟一个长时间运行的请求处理程序来了解这个模型的工作原理. 根据答案编辑到目前为止: >不接受被认为是异步的所有请求?如果一个新的 >假设典型的请求需要100 ms到1 sec之间(根据请求的种类和性质).所以这意味着 >即使我决定将所有这些操作推送到工作线程(pooled),那么我回到同样的问题呢?上下文切换线程? >不完全明白这个模型是如何声称异步的. server.requestStream().toObservable().subscribe(req – > { LocalDateTime start = LocalDateTime.now(); System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME)); jdbc.getConnectionObservable().subscribe( conn -> { // Now chain some statements using flatmap composition Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'"); // Subscribe to the final result resa.subscribe(resultSet -> { req.response().end(resultSet.getRows().toString()); System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)); },err -> { System.out.println("Database problem"); err.printStackTrace(); }); },// Could not connect err -> { err.printStackTrace(); } ); }); server.listen(4568); >选择查询需要3秒钟,大约返回完整的表转储. 解决方法
事实上,Vert.x事件循环是许多平台上存在的经典事件循环.当然,对于Node.js来说,大多数解释和文档都可以找到,因为它是基于这种架构模式的最流行的框架.在Node.js事件循环下,看看一个或多或少的优秀的机械师
explanation.
Vert.x tutorial在“不要打电话给我们,我们打电话给你”和“Verticles”也有很好的解释.
编辑您的更新: 首先,当您处理事件循环时,主线程对于所有请求都应该很快地工作.你不应该在这个循环中做任何长工作.当然你也不应该等到你对数据库的回应.
在这种情况下,您的请求有一些计算昂贵的部分或访问IO – 您的事件循环中的代码不应该等待此操作的结果.
当您有太多的并发请求和传统的编程模型时,您将根据每个请求进行线程.这个线程会做什么?他们将主要等待IO操作(例如,数据库的结果).浪费资源在我们的事件循环模型中,您有一个主线程用于计划操作,并为长任务预先分配了工作线程数量.没有一个这个工作人员实际上等待响应,他们只需执行另一个代码,同时等待IO结果(可以实现为当前正在进行的IO作业的回调或定期检查状态).我建议您通过Java NIO和Java NIO 2来了解如何在框架内实际执行此异步IO. Green threads也是非常相关的概念,那将是很好的理解.绿色线程和协同程序是一种阴影事件循环,试图实现相同的事情 – 少线程,因为我们可以重用系统线程,而绿色线程等待某事.
确定我们不要等待主线程发送先前请求的响应.获取请求,安排长/ IO任务执行,下次请求.
如果你使一切正确 – 不.更重要的是,您将获得良好的数据位置和执行流预测.一个CPU内核将执行您的短事件循环并调度异步工作,无需上下文切换,而无需再进行任何操作.其他核心调用数据库并返回响应,只有这样.在回调之间切换或检查不同通道的IO状态实际上不需要任何系统线程的上下文切换 – 它实际上在一个工作线程中工作.因此,我们每个核心都有一个工作线程,这个系统线程等待/检查从多个连接到数据库的结果可用性.回顾Java NIO概念,了解它如何以这种方式工作. (NIO代理服务器的典型示例 – 可以接受多个并行连接(数千个)的代理服务器,向其他一些远程服务器发送代理请求,收听响应并将响应发送回客户端,所有这些使用一个或两个线程) 关于你的代码,我为你做了一个样例project来证明一切都符合预期: public class MyFirstVerticle extends AbstractVerticle { @Override public void start(Future<Void> fut) { JDBCClient client = JDBCClient.createShared(vertx,new JsonObject() .put("url","jdbc:hsqldb:mem:test?shutdown=true") .put("driver_class","org.hsqldb.jdbcDriver") .put("max_pool_size",30)); client.getConnection(conn -> { if (conn.failed()) {throw new RuntimeException(conn.cause());} final SQLConnection connection = conn.result(); // create a table connection.execute("create table test(id int primary key,name varchar(255))",create -> { if (create.failed()) {throw new RuntimeException(create.cause());} }); }); vertx .createHttpServer() .requestHandler(r -> { int requestId = new Random().nextInt(); System.out.println("Request " + requestId + " received"); client.getConnection(conn -> { if (conn.failed()) {throw new RuntimeException(conn.cause());} final SQLConnection connection = conn.result(); connection.execute("insert into test values ('" + requestId + "','World')",insert -> { // query some data with arguments connection .queryWithParams("select * from test where id = ?",new JsonArray().add(requestId),rs -> { connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}}); System.out.println("Result " + requestId + " returned"); r.response().end("Hello"); }); }); }); }) .listen(8080,result -> { if (result.succeeded()) { fut.complete(); } else { fut.fail(result.cause()); } }); } } @RunWith(VertxUnitRunner.class) public class MyFirstVerticleTest { private Vertx vertx; @Before public void setUp(TestContext context) { vertx = Vertx.vertx(); vertx.deployVerticle(MyFirstVerticle.class.getName(),context.asyncAssertSuccess()); } @After public void tearDown(TestContext context) { vertx.close(context.asyncAssertSuccess()); } @Test public void testMyApplication(TestContext context) { for (int i = 0; i < 10; i++) { final Async async = context.async(); vertx.createHttpClient().getNow(8080,"localhost","/",response -> response.handler(body -> { context.assertTrue(body.toString().contains("Hello")); async.complete(); }) ); } } } 输出: Request 1412761034 received Request -1781489277 received Request 1008255692 received Request -853002509 received Request -919489429 received Request 1902219940 received Request -2141153291 received Request 1144684415 received Request -1409053630 received Request -546435082 received Result 1412761034 returned Result -1781489277 returned Result 1008255692 returned Result -853002509 returned Result -919489429 returned Result 1902219940 returned Result -2141153291 returned Result 1144684415 returned Result -1409053630 returned Result -546435082 returned 所以,我们接受请求 – 调度请求到数据库,去下一个请求,我们消耗所有它们,并发送每个请求的响应只有当所有的数据库完成. 关于你的代码示例我看到两个可能的问题 – 首先,看起来你不关闭()连接,这是重要的返回到池.二,池如何配置?如果只有一个免费连接 – 这个请求将序列化等待这个连接. 我建议您添加一些打印时间戳的两个请求,以查找序列化的位置.你把一些让事件循环阻塞的调用.或者…检查您在测试中并发发送请求.以前没有得到响应之后. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |