加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

数据处理----Java数据处理之RMI

发布时间:2020-12-14 02:18:46 所属栏目:大数据 来源:网络整理
导读:这个专题的目的就是想梳理下在Hadoop等等分布式系统中可能用到的一些知识。在分布式系统中,数据怎么组织,怎么转换,怎么保存,怎么交换,保证数据数据有清楚的语义,合理的流程,一定的效率,有很多问题在里面。而这些复杂的技术,其实也是由很多基础组合

这个专题的目的就是想梳理下在Hadoop等等分布式系统中可能用到的一些知识。在分布式系统中,数据怎么组织,怎么转换,怎么保存,怎么交换,保证数据数据有清楚的语义,合理的流程,一定的效率,有很多问题在里面。而这些复杂的技术,其实也是由很多基础组合而成的。有了牢固的基础,很多东西可以搭配在一起,自然就成了所谓的框架或者平台,正所谓罗马不是一天建成的。

一、RMI

?????? RMI全称是Remote Method Invocation-远程方法调用,它提供了一种跨JVM方法调用的机制,Since JDK1.1。其实它可以被看作是RPC(Remote Procdure Call)的Java版本。Java RMI 支持存储于不同地址空间的程序级对象之间彼此进行Socket通信,实现远程对象之间的无缝远程调用。

?????? RMI目前使用Java远程消息交换协议JRMP(Java Remote Messaging Protocol)进行通信。由于JRMP是专为Java对象制定的,用Java RMI开发的应用系统可以部署在任何支持JRE(Java Run Environment Java,运行环境)的平台上。但由于JRMP是专为Java对象制定的,因此,RMI对于用非Java语言开发的应用系统的支持不足。RMI可利用标准Java本机方法接口JNI与现有的和原有的系统相连接。RMI还可利用标准JDBC包与现有的关系数据库连接。

????? RMI原理:(注意:下图中的箭头是双向的)

?????

??????? 整个流程不多说,想访问别人提供的服务,就是Client;提供服务给别人来访问的就是Server。这里有两个概念要注意:Stub和Skeleton。

?????? Stub????? 每个远程对象都包含一个代理对象Stub,当运行在本地Java虚拟机上的程序调用运行在远程Java虚拟机上的对象方法时,它首先在本地创建该对象的代理对象Stub,然后调? 用代理对象上匹配的方法,代理对象会作如下工作:
????? 与远程对象所在的虚拟机建立连接
????? 打包(marshal)参数并发送到远程虚拟机
????? 等待执行结果
????? 解包(unmarshal)返回值或返回的错误
??? ? 返回调用结果给调用程序
???? Stub 对象负责调用参数和返回值的流化(serialization)、打包解包,以及网络层的通讯过程。
???? Skeleton??? 每一个远程对象同时也包含一个Skeleton对象,Skeleton运行在远程对象所在的虚拟机上,接受来自Stub对象的调用。当Skeleton接收到来自Stub对象的调用请求后,Skeleton会作如下工作:
????? 解包Stub传来的参数
????? 调用远程对象匹配的方法
???? 打包返回值或错误发送给Stub对象
???? 注意:远程对象的Stub和Skeleton对象都是由rmic编译工具产生的。

???? 上一个简单的例子吧:

1.定义远程服务接口?? ITranslate.java

??? import java.rmi.*;

public interface ITranslate extends Remote {
?? ?public String en2ch(String str) throws RemoteException;
}

2.实现远程服务接口?? ITranslateImpl.java

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class ITranslateImpl extends UnicastRemoteObject? implements ITranslate {
?? ?private static final long serialVersionUID = -8492344530612173938L;
?? ?protected ITranslateImpl() throws RemoteException {
??????? super();
??? }
?? ?@Override
?? ?public String en2ch(String str) throws RemoteException {
?? ??? ?if(str!=null&&str.equals("hello")){
?? ??? ??? ?return "你好";
?? ??? ?}else if(str!=null&&str.equals("world")){
?? ??? ??? ?return "世界";
?? ??? ?}else
?? ??? ?return "不知道怎么说";
?? ?}

}

3. 服务端程序? RMIServer.java

import java.rmi.Naming;
import java.rmi.RMISecurityManager;
import java.rmi.registry.LocateRegistry;

public class RMIServer
{
???? public static void main(String[] args) throws Exception
???? {
????????? //System.setSecurityManager(new RMISecurityManager()); 注释了就可以不要policy文件了,否则必须处理,具体可以自己试试
????????? ITranslate t = new ITranslateImpl();
????????? //LocateRegistry.createRegistry(1099);
????????? Naming.bind("TranslatorService",t);
????????? System.out.println("RMI server started and provide service now...");
???? }
}

4.客户端访问?? RMIClient.java

import java.rmi.*;
import java.net.MalformedURLException;

public class RMIClient
{
??? public static void main(String[] args) throws Exception
??? {
??????? //System.setSecurityManager(new RMISecurityManager()); 注释了就可以不要policy文件了,否则必须处理,具体可以自己试试
??????? try
??????? {
?????? ??? ?ITranslate t = (ITranslate)Naming.lookup("rmi://localhost/TranslatorService");
?????? ??? ?String str = "world";
??????????? String rs = t.en2ch("world");
??????????? System.out.println(str+"中文意思就是" + rs);
??????? }
??????? catch (MalformedURLException e)
??????? {
??????????? e.printStackTrace();
??????? }
??????? catch (RemoteException e)
??????? {
??????????? e.printStackTrace();
??????? }
??????? catch (NotBoundException e)
??????? {
??????????? e.printStackTrace();
??????? }
??? }
}

运行步骤:

1.编译? javac –d . *.java

2.通过RMIC生成Stub(这一步可以不要做). RMIC是什么,RMI compile,在JAVA_HOME/bin下面(bin下面有哪里,看你能说出哪些,java,javac等等,说一个算一分,看你能拿多少分,可以大概看出你的java level哦,不要说你没有用到,不信试试!)

?? rmic org.test.rmi.ITranslateImpl

从上图的警告可以看出:为JRMP生成和使用骨架及静态存根已经过时。骨架不再必要,而静态存根已由动态生成的存根取代。建议用户不再使用rmic来生成骨架和静态存根。说的已经很明白了。

当然做了这步后可以在classes文件里面找到一个ITranslateImpl_Stub.class这个是自动生成的字节码,一起反编译看看内容吧(有点稍多,还是看看吧):

package org.test.rmi;

import java.lang.reflect.Method;
import java.rmi.RemoteException;
import java.rmi.UnexpectedException;
import java.rmi.server.RemoteObject;
import java.rmi.server.RemoteRef;
import java.rmi.server.RemoteStub;

public final class ITranslateImpl_Stub extends RemoteStub
? implements ITranslate
{
? private static final long serialVersionUID = 2L;
? private static Method $method_en2ch_0;

? static
? {
??? try
??? {
????? $method_en2ch_0 = ITranslate.class.getMethod("en2ch",new Class[] { String.class });
??? }
??? catch (NoSuchMethodException localNoSuchMethodException)
??? {
????? throw new NoSuchMethodError("stub class initialization failed");
??? }
? }

? public ITranslateImpl_Stub(RemoteRef paramRemoteRef)
? {
??? super(paramRemoteRef);
? }

? public String en2ch(String paramString)
??? throws RemoteException
? {
??? try
??? {
????? Object localObject = this.ref.invoke(this,$method_en2ch_0,new Object[] { paramString },-3926061110445064938L);
????? return (String)localObject;
??? }
??? catch (RuntimeException localRuntimeException)
??? {
????? throw localRuntimeException;
??? }
??? catch (RemoteException localRemoteException)
??? {
????? throw localRemoteException;
??? }
??? catch (Exception localException)
??? {
??? }
??? throw new UnexpectedException("undeclared checked exception",localException);
? }
}

有所发现吗,你一定看到了这么几个词:reflect,RemoteStub,RemoteObject(这个序列化的时候在说说吧),不多说了,你懂的。

3.启动 start rmiregistry?? (在Server里面LocateRegistry.createRegistry(1099);可以不用这步)

4.添加Policy文件,启动RMIServer

translate.policy文件

grant {
??? permission java.security.AllPermission;
};


5.运行RMIClient



整个过程说简单也简单,说复杂也复杂。但是有两点要注意:

1.JDK帮我们做了很多工作,已经封装好了很多过程,比如数据怎么从客户端包装好,成为Stub,怎么通过Socket传到服务端,怎么形成Skeleton,怎么调用,然后将结果反向传回来给客户端。

2.RMI需要安全机制,现在默认采用Policy文件的形式。


没错,RMI整个过程就是数据通过Socket通信,完成过程调用。这个过程我们可以自己实现一个简单的版本吗,毕竟Socket我们也可以写。下面就介绍一个网上的简易版本。思路在注释里面已经明显了,就是获得接口的代理对象,通过ObjectOutputStream ?? writeUTF(Name)---writeObject(ParameterTypes)---writeObject(arguments),在服务端接收的时候,通过ObjectInputStream ?readUTF,readObject,readObject,然后通过反射invoke,得到调用结果。

1. RPC服务核心类

package framework;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

public class RpcFramework {

?? ?public static void export(final Object service,int port) throws Exception {
?? ??? ?if (service == null || port <= 0 || port > 65535)
?? ??? ??? ?throw new IllegalArgumentException("Null service or Wrong port");
?? ??? ?System.out.println("Export service " + service.getClass().getName()
?? ??? ??? ??? ?+ " on port " + port);
?? ??? ?ServerSocket server = new ServerSocket(port);
?? ??? ?for (;;) {
?? ??? ??? ?try {
?? ??? ??? ??? ?final Socket socket = server.accept();//服务器端一旦收到消息,就创建一个线程进行处理
?? ??? ??? ??? ?new Thread(new Runnable() {
?? ??? ??? ??? ??? ?@Override
?? ??? ??? ??? ??? ?public void run() {
?? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ??? ?ObjectInputStream input = new ObjectInputStream(
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?socket.getInputStream());
?? ??? ??? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ??? ??? ?String methodName = input.readUTF();// service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称
?? ??? ??? ??? ??? ??? ??? ??? ??? ?Class<?>[] parameterTypes = (Class<?>[]) input
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?.readObject();// 获得参数的类型
?? ??? ??? ??? ??? ??? ??? ??? ??? ?Object[] arguments = (Object[]) input
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?.readObject();// 获得参数
?? ??? ??? ??? ??? ??? ??? ??? ??? ?ObjectOutputStream output = new ObjectOutputStream(
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?socket.getOutputStream());
?? ??? ??? ??? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?Method method = service.getClass()
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?.getMethod(methodName,
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?parameterTypes);// 通过反射机制获得方法
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?Object result = method.invoke(service,
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?arguments);// 通过反射机制获得类的方法,并调用这个方法
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?output.writeObject(result);// 将结果发送
?? ??? ??? ??? ??? ??? ??? ??? ??? ?} catch (Throwable t) {
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?output.writeObject(t);
?? ??? ??? ??? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?output.close();
?? ??? ??? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ??? ??? ?input.close();
?? ??? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ??? ?socket.close();
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?} catch (Exception e) {
?? ??? ??? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?}).start();
?? ??? ??? ?} catch (Exception e) {
?? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ?}
?? ??? ?}
?? ?}

?? ?/**
?? ? * 引用服务
?? ? *
?? ? * @param <T>
?? ? *??????????? 接口泛型
?? ? * @param interfaceClass
?? ? *??????????? 接口类型
?? ? * @param host
?? ? *??????????? 服务器主机名
?? ? * @param port
?? ? *??????????? 服务器端口
?? ? * @return 远程服务
?? ? * @throws Exception
?? ? */
?? ?// 原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果
?? ?@SuppressWarnings("unchecked")
?? ?public static <T> T refer(final Class<T> interfaceClass,final String host,
?? ??? ??? ?final int port) throws Exception {
?? ??? ?if (interfaceClass == null)
?? ??? ??? ?throw new IllegalArgumentException("Interface class == null");
?? ??? ?if (!interfaceClass.isInterface())
?? ??? ??? ?throw new IllegalArgumentException("The "
?? ??? ??? ??? ??? ?+ interfaceClass.getName() + " must be interface class!");
?? ??? ?if (host == null || host.length() == 0)
?? ??? ??? ?throw new IllegalArgumentException("Host == null!");
?? ??? ?if (port <= 0 || port > 65535)
?? ??? ??? ?throw new IllegalArgumentException("Invalid port " + port);
?? ??? ?System.out.println("Get remote service " + interfaceClass.getName()
?? ??? ??? ??? ?+ " from server " + host + ":" + port);
?? ??? ?return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
?? ??? ??? ??? ?new Class<?>[] { interfaceClass },new InvocationHandler() {
?? ??? ??? ??? ??? ?public Object invoke(Object proxy,Method method,
?? ??? ??? ??? ??? ??? ??? ?Object[] arguments) throws Throwable {
?? ??? ??? ??? ??? ??? ?Socket socket = new Socket(host,port);
?? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ?ObjectOutputStream output = new ObjectOutputStream(
?? ??? ??? ??? ??? ??? ??? ??? ??? ?socket.getOutputStream());
?? ??? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ??? ?output.writeUTF(method.getName());
?? ??? ??? ??? ??? ??? ??? ??? ?output.writeObject(method.getParameterTypes());
?? ??? ??? ??? ??? ??? ??? ??? ?output.writeObject(arguments);
?? ??? ??? ??? ??? ??? ??? ??? ?ObjectInputStream input = new ObjectInputStream(
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?socket.getInputStream());
?? ??? ??? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ??? ??? ?Object result = input.readObject();
?? ??? ??? ??? ??? ??? ??? ??? ??? ?if (result instanceof Throwable) {
?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?throw (Throwable) result;
?? ??? ??? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ??? ??? ??? ?return result;
?? ??? ??? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ??? ??? ?input.close();
?? ??? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ??? ?output.close();
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ?socket.close();
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?});
?? ?}


}

2.? 业务类

package service;
public interface? HelloService { ?
??? String hello(User name); ?
}

package service;
public class HelloServiceImpl implements HelloService{ ?
??? public String hello(User name) { ?
??????? return "Hello " + name.getName(); ?
??? }?
}


public class User {
?? ?private int id;
?? ?private String name;
?? ?
?? ?public User(int id,String name) {
?? ??? ?super();
?? ??? ?this.id = id;
?? ??? ?this.name = name;
?? ?}

?? 。。。。//getter setter

}

3. 测试调用:

package service;

import framework.RpcFramework;

public class Server { ?
??? public static void main(String []args) throws Exception { ?
??????? HelloService service = new HelloServiceImpl(); ?
??????? RpcFramework.export(service,1234);? ?
??? }
}


package service;

import framework.RpcFramework;
public class Client { ?
??? public static void main(String[] args) throws Exception {?? ?
??????? HelloService service = RpcFramework.refer(HelloService.class,"127.0.0.1",1234);?? ?
??????? for (int i = 0; i < 100; i ++) { ?
??????????? String hello = service.hello(new User(i,"World" + i));?? ?
??????????? System.out.println(hello);?? ?
??????????? Thread.sleep(1000);?? ?
??????? }
??? }
}

运行的时候先启动Server,然后启动Client.

不出意外,应该会出现错误(服务端和客户端都有):

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
?? ?at com.sun.proxy.$Proxy0.hello(Unknown Source)
?? ?at service.Client.main(Client.java:9)
Caused by: java.io.NotSerializableException: service.User
?? ?at java.io.ObjectOutputStream.writeObject0(Unknown Source)


看到这里,解决这个问题也是很容易的。(最上面那个ITranslate中也一样,如果参数为User,也是会出现异常的,下面是我测试的结果)

java.rmi.MarshalException: error marshalling arguments; nested exception is:
?? ?java.io.NotSerializableException: org.test.rmi.User
?? ?at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:157)
?? ?at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:194)
?? ?at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:148)
?? ?at com.sun.proxy.$Proxy0.en2ch(Unknown Source)????

(看懂Proxy,invoke之类的很兴奋吧,有木有感觉和我们刚刚实现的例子很相似?!可以去了解下动态代理怎么实现。)

觉得上面的错误就是将传输的对象进行序列化:

public class User implements java.io.Serializable{}

为什么要做这一步呢?且听下回分解。

数据处理---Java数据处理之序列化

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读