加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

使用Angular2 / RxJS读取缓冲响应

发布时间:2020-12-17 17:27:05 所属栏目:安全 来源:网络整理
导读:我正在建立一个从后端读取数据的网站.该数据即时计算并以缓冲方式发送回客户端.即一旦计算出第一个块,就会将其发送到客户端,然后计算下一个块并将其发送给客户端.整个过程发生在同一个HTTP请求中.客户端不应该等待完成响应完成,而是在发送后立即自己处理每个
我正在建立一个从后端读取数据的网站.该数据即时计算并以缓冲方式发送回客户端.即一旦计算出第一个块,就会将其发送到客户端,然后计算下一个块并将其发送给客户端.整个过程发生在同一个HTTP请求中.客户端不应该等待完成响应完成,而是在发送后立即自己处理每个块.通常可以使用XHR进度处理程序(例如 How to get progress from XMLHttpRequest)来消费这样的响应.

如何使用RxJS和Observables在Angular2中使用HttpModule消耗这样的响应?

编辑:peeskillet在下面给出了一个非常详细的答案.另外,我做了一些进一步挖掘,发现了feature request for the HttpModule of Angular和StackOverflow question with another approach on how to solve it.

解决方法

注意:以下答案仅为POC.它旨在教育Http的体系结构,并提供简单的工作POC实现.我们应该看一下 XHRConnection的来源,了解在实施时应该考虑的其他内容.

在尝试实现这一点时,我认为没有任何方法可以直接进入XHR.看起来我们可能需要提供使用Http所涉及的一些组件的自定义实现.我们应该考虑的三个主要组成部分是

>连接
> ConnectionBackend
> Http

Http将ConnectionBackend作为其构造函数的参数.当发出请求时,比如get,Http与ConnectionBackend.createConnection创建连接,并返回Connection的Observable属性(从createConnection返回).在最简单的(简化)视图中,它看起来像这样

class XHRConnection implements Connection {
  response: Observable<Response>;
  constructor( request,browserXhr) {
    this.response = new Observable((observer: Observer<Response>) => {
      let xhr = browserXhr.create();
      let onLoad = (..) => {
        observer.next(new Response(...));
      };
      xhr.addEventListener('load',onLoad);
    })
  }
}

class XHRBackend implements ConnectionBackend {
  constructor(private browserXhr) {}
  createConnection(request): XHRConnection {
    return new XHRConnection(request,this.broswerXhr).response;
  }
}

class Http {
  constructor(private backend: ConnectionBackend) {}

  get(url,options): Observable<Response> {
    return this.backend.createConnection(createRequest(url,options)).response;
  }
}

因此,了解这种架构,我们可以尝试实现类似的东西.

对于Connection,这是POC.为简洁而省略了导入,但在大多数情况下,所有内容都可以从@ angular / http导入,Observable / Observer可以从rxjs / {Type}导入.

export class Chunk {
  data: string;
}

export class ChunkedXHRConnection implements Connection {
  request: Request;
  response: Observable<Response>;
  readyState: ReadyState;

  chunks: Observable<Chunk>;

  constructor(req: Request,browserXHR: BrowserXhr,baseResponSEOptions?: ResponSEOptions) {
    this.request = req;
    this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
      let _xhr: XMLHttpRequest = browserXHR.build();
      let previousLen = 0;
      let onProgress = (progress: ProgressEvent) => {
        let text = _xhr.responseText;
        text = text.substring(previousLen);
        chunkObserver.next({ data: text });
        previousLen += text.length;

        console.log(`chunk data: ${text}`);
      };
      _xhr.addEventListener('progress',onProgress);
      _xhr.open(RequestMethod[req.method].toUpperCase(),req.url);
      _xhr.send(this.request.getBody());
      return () => {
        _xhr.removeEventListener('progress',onProgress);
        _xhr.abort();
      };
    });
  }
}

这是我们刚订阅XHR进展事件.由于XHR.responseText会发出整个连接文本,我们只需子串获取块,然后通过Observer发出每个chuck.

对于XHRBackend,我们有以下(没什么了不起的).同样,一切都可以从@ angular / http导入;

@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
  constructor(
      private _browserXHR: BrowserXhr,private _baseResponSEOptions: ResponSEOptions,private _xsrfStrategy: XSRFStrategy) {}

  createConnection(request: Request): ChunkedXHRConnection {
    this._xsrfStrategy.configureRequest(request);
    return new ChunkedXHRConnection(request,this._browserXHR,this._baseResponSEOptions);
  }
}

对于Http,我们将扩展它,添加一个getChunks方法.如果需要,您可以添加更多方法.

@Injectable()
export class ChunkedHttp extends Http {
  constructor(protected backend: ChunkedXHRBackend,protected defaultOptions: RequestOptions) {
    super(backend,defaultOptions);
  }

  getChunks(url,options?: RequestOptionsArgs): Observable<Chunk> {
    return this.backend.createConnection(
       new Request(mergeOptions(this.defaultOptions,options,RequestMethod.Get,url))).chunks;
  }
}

mergeOptions方法可以在Http source中找到.

现在我们可以为它创建一个模块.用户应该直接使用ChunkedHttp而不是Http.但是因为不试图覆盖Http令牌,如果需要,你仍然可以使用Http.

@NgModule({
  imports: [ HttpModule ],providers: [
    {
      provide: ChunkedHttp,useFactory: (backend: ChunkedXHRBackend,options: RequestOptions) => {
        return new ChunkedHttp(backend,options);
      },deps: [ ChunkedXHRBackend,RequestOptions ]
    },ChunkedXHRBackend
  ]
})
export class ChunkedHttpModule {
}

我们导入HttpModule,因为它提供了我们需要注入的其他服务,但是如果我们不需要,我们不希望重新实现这些服务.

要测试只是将ChunkedHttpModule导入AppModule.另外要测试我使用了以下组件

@Component({
  selector: 'app',encapsulation: ViewEncapsulation.None,template: `
    <button (click)="onClick()">Click Me!</button>
    <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
  `,styleUrls: ['./app.style.css']
})
export class App {
  chunks: string[] = [];

  constructor(private http: ChunkedHttp) {}

  onClick() {
    this.http.getChunks('http://localhost:8080/api/resource')
      .subscribe(chunk => this.chunks.push(chunk.data));
  }
}

我有一个后端端点设置,它每隔半秒就会在10个块中吐出“消息#x”.这就是结果

enter image description here

某处似乎有一个bug.只有九个:-).我认为它与服务器端有关.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读