项目中有一个功能,通过SFTP读取服务器节点上的PCAP文件进行解析,如何通过SFTP访问文件网上代码很多,不是重点就不贴了,解析的时候一直报错:Java Code Examples for io.netty.handler.codec.TooLongFrameException. 定位到问题在于netty解析时出现问题,数据包超出长度,netty在解析数据包是按照固定大小解析,说明我读写文件有问题,后来定位到IO write(buf,len)和write(buf)的原因,如果read(byte[] buf)的buf 为1024,最后一次read,文件的内容小于1024,比如100,write(buf)还是写1024大小的字节,那么924字节是什么?测试发现,是上一次读存下的内容。这样就导致我的数据包多出924个字节,这就是问题所在。解决办使用write(buf,len)。 java pipeline并发模式: 1、pipeline简介 更多请关注我的个人博客。
pipeline又称为管道,是一种在计算机普遍使用的技术。举个最普遍的例子,如下图所示cpu流水线,一个流水线分为4部分,每个部分可以独立工作,于是可以处理多个数据流。Linux 管道也是一个常用的管道技术,其字符处理功能十分强大,在面试过程中常会被问到。在分布式处理领域,由于管道模式是数据驱动,而目前流行的Spark分布式处理平台也是数据驱动的,两者非常合拍,于是在spark的新的api里面pipeline模式得到了广泛的应用。还有Java web中的struct的filter、netty的pipeline,无处不见pipeline模式。因此,本小结的目标是使用java编写一个简易的pipeline小程序,并进行相应的功能,性能 测试。
2、设计思路 我以字符串处理为例,通常对字符串的处理不会一步就完成,需要分成几部来完成,而并行的处理方式不适合于大量数据流的场景。于是本节的目的是实现一个并行字符串处理程序。设计思路如下图所示,参考了netty的管道模型。
一个管道中包含多个channelhandler、每个handler实现了具体的处理方法,每个handler都唯一对应了一个context,在pipeline中,所有的context是构成一个链式结构的,链式结构是有序的,于是context的先后顺序决定了处理的先后顺序,pipeline可以通过context来调用handler。每个handler实际上只是一个接口,需要用户去自己实现。
3、Handler接口 该接口只需要实现字符串处理,然后把结果传给下一个handler就可以了。代码入下,接口参数HandlerContext为下一个handler的context,而Object为从上一个handler处理之后传入的结果。
[java] view plain copy public interface Handler { void channelRead(HandlerContext ctx,Object msg); }
4、HandlerContext HandlerContext的作用比较大,首先它是链表的一部分,因此需要有指向下一个context的指针;然后它负责调用handler,而我们要实现一个并发的处理程序,那么HandlerContext就需要维护一个线程池来供handler处理。具体代码如下:
[java] view plain copy public class HandlerContext { private ExecutorService executor= Executors.newCachedThreadPool();//线程池 private Handler handler; private HandlerContext next;//下一个context的引用 public HandlerContext(Handler handler){ this.handler=handler; } public void setNext(HandlerContext ctx){ this.next=ctx; } public void doWork(Object msg){//执行任务的时候向线程池提交一个runnable的任务,任务中调用handler if(next==null){ return; }else { executor.submit(new Runnable() { @Override public void run() { handler.channelRead(next,msg);//把下一个handler的context穿个handler来实现回调 } }); } //handler.channelRead(next,msg); } public void write(Object msg){//这里的write操作是给handler调用的,实际上是一个回调方法,当handler处理完数据之后,调用一下nextcontext.write,此时就把任务传递给下一个handler了。 doWork(msg); } }
5、pipeline pipeline维护了handlercontext链表,对该链表进行增删改操作,同时他对外提供了整个pipeline的调用接口。其代码如下:
[java] view plain copy public class MyPipeline { private HandlerContext head;//链表头 private HandlerContext tail;//链表尾,如果是一个双向链表,这个成员将会被用到,netty就使用的双向链表,因为是全双工的。 public void addFirst(Handler handler){//这里仅仅实现了一个简单的插入操作,即在链表的头部出入一个handler。 HandlerContext ctx=new HandlerContext(handler); HandlerContext tmp=head; head=ctx; head.setNext(tmp); } public MyPipeline(){ head=tail=new HeadContext(new HeadHandler()); } public void Request(Object msg){//封装了外部调用接口 head.doWork(msg); } final class HeadContext extends HandlerContext{//这是一个内部类,为默认handler的context public HeadContext(Handler handler) { super(handler); } } final class HeadHandler implements Handler{//这是一个内部类,是pipeline的默认处理handler。 @Override public void channelRead(HandlerContext ctx,Object msg) { String result=(String)msg+”end”; System.out.println(result); } } }
6、自定义handler 下面为了测试,将自定义两个handler,分别为TestHandler1、TestHandler2,连个handler的作用为在字符串后面加上特定的后缀。其实现如下:
6.1 TestHandler1 [java] view plain copy public class TestHandler1 implements Handler{ @Override public void channelRead(HandlerContext ctx,Object msg) { //// TODO: 2016/11/22 try { Thread.sleep(1000);//模拟阻塞 } catch (InterruptedException e) { e.printStackTrace(); } String result=(String)msg+”-handler1”;//在字符串后面加特定字符串 System.out.println(result); ctx.write(result);//写入操作,这个操作是必须的,相当于将结果传递给下一个handler } }
6.2 TestHandler2 [java] view plain copy public class TestHandler2 implements Handler{ @Override public void channelRead(HandlerContext ctx,Object msg) { //// TODO: 2016/11/22 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String result=(String)msg+”-handler2”; System.out.println(result); ctx.write(result); } }
7、测试 如下所示的代码中,先定义一个pipeline,然后在pipeline中添加两个handler,然后为了测试并发性能,分别请求多次。
[java] view plain copy public class Main { public static void main(String[] args){ MyPipeline pipeline=new MyPipeline(); pipeline.addFirst(new TestHandler2());//添加handler1 pipeline.addFirst(new TestHandler1());//添加handler2 for(int i=0;i<10;i++){//提交多个任务 pipeline.Request(“hello”+i); } } }
8、测试结果 测试结果可以看到,10个任务都正确执行了(在字符串后面加了两个特定字符串),由于并发执行,其输出结果并不是按照任务提交的时候的顺序输出的。经过统计,程序运行时间大约2秒,正好为模拟两个handler阻塞的时间。如果是单线程程序的话,每个请求都会阻塞2秒,10个任务就是20秒,会比我写的并行程序慢很多。
[java] view plain copy hello0-handler1 hello5-handler1 hello4-handler1 hello3-handler1 hello2-handler1 hello1-handler1 hello7-handler1 hello6-handler1 hello8-handler1 hello9-handler1 hello0-handler1-handler2 hello4-handler1-handler2 hello2-handler1-handler2 hello5-handler1-handler2 hello1-handler1-handler2 hello6-handler1-handler2 hello8-handler1-handler2 hello7-handler1-handler2 hello3-handler1-handler2 hello9-handler1-handler2 (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|