加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – Vert.x事件循环 – 这是怎么异步的?

发布时间:2020-12-14 05:34:39 所属栏目:Java 来源:网络整理
导读:我正在使用Vert.x和相当于基于事件循环的服务器,而不是线程/连接模型. public void start(FutureVoid fut) { vertx .createHttpServer() .requestHandler(r - { LocalDateTime start = LocalDateTime.now(); System.out.println("Request received - "+start
我正在使用Vert.x和相当于基于事件循环的服务器,而不是线程/连接模型.
public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568,result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }

>我只是想模拟一个长时间运行的请求处理程序来了解这个模型的工作原理.
>我观察到的是所谓的事件循环被阻塞,直到我的第一个请求完成.无论什么时间需要,后续请求不会被执行,直到前一个完成.
显然,我在这里错过了一件,这就是我在这里的问题.

根据答案编辑到目前为止:

>不接受被认为是异步的所有请求?如果一个新的
只有当前一个清除时才能接受连接
关闭,如何异步?

>假设典型的请求需要100 ms到1 sec之间(根据请求的种类和性质).所以这意味着
事件循环直到先前的请求才能接受新的连接
完成(即使它在一秒钟内风起来).如果我是一个程序员
必须考虑所有这些,并将这些请求处理程序推送到
工作线程,那么它如何与线程/连接有所不同
模型?
>我只是想了解传统线程/连接服务器型号的这个模型是如何更好的?假设没有I / O操作或
所有的I / O操作是异步处理的?它甚至如何解决
c10k问题,当它不能并行启动所有并发请求,并且必须等到上一个终止?

>即使我决定将所有这些操作推送到工作线程(pooled),那么我回到同样的问题呢?上下文切换线程?
编辑并填补这个问题的赏金

>不完全明白这个模型是如何声称异步的.
> Vert.x有一个异步JDBC客户端(Asyncronous是关??键字),我试图用RXJava进行调整.
>这里是代码示例(相关部分)

server.requestStream().toObservable().subscribe(req – > {

LocalDateTime start = LocalDateTime.now();
        System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
        jdbc.getConnectionObservable().subscribe(
                conn -> {

                    // Now chain some statements using flatmap composition
                    Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                    // Subscribe to the final result
                    resa.subscribe(resultSet -> {

                        req.response().end(resultSet.getRows().toString());
                        System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                    },err -> {
                        System.out.println("Database problem");
                        err.printStackTrace();
                    });
                },// Could not connect
                err -> {
                    err.printStackTrace();
                }
                );

});
server.listen(4568);

>选择查询需要3秒钟,大约返回完整的表转储.
>当我启动并发请求(仅使用2)时,我看到第二个请求完全等待第一个请求完成.
>如果JDBC select是异步的,在等待select查询返回任何东西的时候,框架是否处理第二个连接是不是一个很好的期望?

解决方法

事实上,Vert.x事件循环是许多平台上存在的经典事件循环.当然,对于Node.js来说,大多数解释和文档都可以找到,因为它是基于这种架构模式的最流行的框架.在Node.js事件循环下,看看一个或多或少的优秀的机械师 explanation. Vert.x tutorial在“不要打电话给我们,我们打电话给你”和“Verticles”也有很好的解释.

编辑您的更新:

首先,当您处理事件循环时,主线程对于所有请求都应该很快地工作.你不应该在这个循环中做任何长工作.当然你也不应该等到你对数据库的回应.
– 异步调度一个调用
– 将回调(处理程序)分配给结果
– 回调将在工作线程,非事件循环线程中执行.例如,此回调将返回对套接字的响应.
所以,你在事件循环中的操作应该只是调用所有异步操作与回调,并转到下一个请求,而不等待任何结果.

Assume a typical request takes anywhere between 100 ms to 1 sec (based on the kind and nature of the request).

在这种情况下,您的请求有一些计算昂贵的部分或访问IO – 您的事件循环中的代码不应该等待此操作的结果.

I’m just trying to understand how is this model better from a traditional thread/conn server models? Assume there is no I/O op or all the I/O op are handled asynchronously?

当您有太多的并发请求和传统的编程模型时,您将根据每个请求进行线程.这个线程会做什么?他们将主要等待IO操作(例如,数据库的结果).浪费资源在我们的事件循环模型中,您有一个主线程用于计划操作,并为长任务预先分配了工作线程数量.没有一个这个工作人员实际上等待响应,他们只需执行另一个代码,同时等待IO结果(可以实现为当前正在进行的IO作业的回调或定期检查状态).我建议您通过Java NIO和Java NIO 2来了解如何在框架内实际执行此异步IO. Green threads也是非常相关的概念,那将是很好的理解.绿色线程和协同程序是一种阴影事件循环,试图实现相同的事情 – 少线程,因为我们可以重用系统线程,而绿色线程等待某事.

How does it even solve c10k problem,when it can’t start all concurrent requests parallel and have to wait till the previous one terminates?

确定我们不要等待主线程发送先前请求的响应.获取请求,安排长/ IO任务执行,下次请求.

Even if I decide to push all these operations to a worker thread(pooled),then I’m back to the same problem isn’t it? Context switching between threads?

如果你使一切正确 – 不.更重要的是,您将获得良好的数据位置和执行流预测.一个CPU内核将执行您的短事件循环并调度异步工作,无需上下文切换,而无需再进行任何操作.其他核心调用数据库并返回响应,只有这样.在回调之间切换或检查不同通道的IO状态实际上不需要任何系统线程的上下文切换 – 它实际上在一个工作线程中工作.因此,我们每个核心都有一个工作线程,这个系统线程等待/检查从多个连接到数据库的结果可用性.回顾Java NIO概念,了解它如何以这种方式工作. (NIO代理服务器的典型示例 – 可以接受多个并行连接(数千个)的代理服务器,向其他一些远程服务器发送代理请求,收听响应并将响应发送回客户端,所有这些使用一个或两个线程)

关于你的代码,我为你做了一个样例project来证明一切都符合预期:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx,new JsonObject()
                .put("url","jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class","org.hsqldb.jdbcDriver")
                .put("max_pool_size",30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key,name varchar(255))",create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "','World')",insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?",new JsonArray().add(requestId),rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080,result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080,"localhost","/",response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

输出:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

所以,我们接受请求 – 调度请求到数据库,去下一个请求,我们消耗所有它们,并发送每个请求的响应只有当所有的数据库完成.

关于你的代码示例我看到两个可能的问题 – 首先,看起来你不关闭()连接,这是重要的返回到池.二,池如何配置?如果只有一个免费连接 – 这个请求将序列化等待这个连接.

我建议您添加一些打印时间戳的两个请求,以查找序列化的位置.你把一些让事件循环阻塞的调用.或者…检查您在测试中并发发送请求.以前没有得到响应之后.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读