`
echozhjun
  • 浏览: 47659 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

java网络编程学习笔记

阅读更多

学了一个星期的java网络编程了。现在来总结一下。

基础:java.net

InetAddress 类是表示 IPInternet 协议)地址的抽象。它拥有两个子类: 用于 IPv4 地址的 Inet4Address 用于 IPv6 地址的 Inet6Address 但是,在大多数情况下,不必直接处理子类,因为 InetAddress 抽象应该覆盖大多数必需的功能。常用的IP和域名之间的相互转换。
Socket  TCP 客户端 API,通常用于连接远程主机。 
ServerSocket  TCP 服务器 API,通常接受源于客户端套接字的连接。 
DatagramSocket  UDP 端点 API,用于发送和接收数据包 
MulticastSocket  DatagramSocket 的子类,在处理多播组时使用。 
使用 TCP 套接字的发送和接收操作需要借助 InputStream  OutputStream 来完成,这两者是通过 Socket.getInputStream()  Socket.getOutputStream() 方法获取的。
URI 是表示在 RFC 2396 中指定的统一资料标识符的类。顾名思义,它只是一个标识符,不直接提供访问资源的方法。 
URL 是表示统一资源定位符的类,它既是 URI 的旧式概念又是访问资源的方法。 
URLConnection 是根据 URL 创建的,是用于访问 URL 所指向资源的通信链接。此抽象类将大多数工作委托给底层协议处理程序,如 http  ftp 
HttpURLConnection  URLConnection 的子类,提供一些特定于 HTTP 协议的附加功能。 建议的用法是使用 URI 指定资源,然后在访问资源时将其转换为 URL。从该 URL 可以获取 URLConnection 以进行良好控制,也可以直接获取 InputStream
 

进阶:java.nio

学习此包,首先要理解非阻塞通信的含义。简单介绍一下就是:只创建一个线程,利用类似统筹原理的思想,去处理多个并发的任务。

ServerSocketChannel ServerSocket 的替代类, 支持阻塞通信与非阻塞通信。

SocketChannel Socket 的替代类, 支持阻塞通信与非阻塞通信。

Selector ServerSocketChannel 监控接收连接就绪事件, SocketChannel 监控连接就绪, 读就绪和写就绪事件。

SelectionKey 代表 ServerSocketChannel SocketChannel Selector 注册事件的句柄。 当一个 SelectionKey 对象位于Selector 对象的 selected-keys 集合中时, 就表示与这个 SelectionKey 对象相关的事件发生了。  

Buffer类,重要的缓冲数据结构。其子类包括:ByteBufferCharBufferIntBuffer等,所有的缓冲区都有以下属性:

  • 容量(capacity) 表示该缓冲区可以保存多少数据。
  • 极限(limit) 表示缓冲区的当前终点, 不能对缓冲区中超过极限的区域进行读写操作。 极限是可以修改的, 这有利于缓冲区的重用。 例如, 假定容量100 的缓冲区已经填满了数据, 接着程序在重用缓冲区时, 仅仅将 10 个新的数据写入缓冲区中从位置0 10 的区域, 这时可以将极限设为 10 这样就不能读取先前的数据了。 极限是一个非负整数, 不应该大于容量。
  • 位置(position) 表示缓冲区中下一个读写单元的位置, 每次读写缓冲区的数据时, 都会改变该值, 为下一次读写数据作准备。 位置是一个非负整数, 不应该大于极限。

Charset 类提供了编码与解码的方法:

  • ByteBuffer encode(String str) 对参数 Str 指定的字符串进行编码, 把得到的字节序列存放在一个 ByteBuffer 对象中, 并将其返回;
  • ByteBuffer encode(CharBuffer cb) 对参数 cb 指定的字符缓冲区中的字符进行编码,把得到的字节序列存放在一个 ByteBuffer 对象中, 并将其返回;
  • CharBuffer decode(ByteBuffer bb): 把参数 bb 指定的 ByteBuffer 中的字节序列进行解码, 把得到的字符序列存放在一个 CharBuffer 对象中, 并将其返回。

      Charset 类的静态 forName(String encode) 方法返回一个 Charset 对象, 它代表参数 encode 指定的编码类型。 

SelectableChannel 类是一种支持阻塞 I/O 和非阻塞 I/O 的通道。 在非阻塞模式下, 读写数据不会阻塞, 并且SelectableChannel 可以向 Selector 注册读就绪和写就绪等事件。 Selector 负责监控这些事件, 等到事件发生时, 比如发生了读就绪事件, SelectableChannel 就可以执行读操作了。SelectableChannel 的主要方法如下:

  • public SelecotableChannel configureBlocking(boolean block) throws IOException

     block true 时, 表示把 SelectableChannel 设为阻塞模式; 如果参数block false 表示把 SelectableChannel 设为非阻塞模式。 默认情况下, SelectableChannel 采用阻塞模式。 该方法返回 SelectableChannel 对象本身的引用, 相当于" return this"

  • public SelectionKey register(Selector sel, int ops) throws ClosedChannelException
  • public SelectionKey register(Selector sel, int ops, Object attachment) throws ClosedChannelException

SeverSocketChannel SeletableChannel 中继承了 configureBlocking() register()方法。 ServerSocketChannel ServerSocket 的替换类, 也具有负责接收客户连接的 accept() 方法。 ServerSocket 并没有 public 类型的构造方法, 必须通过它的静态方法open() 来创建 ServerSocketChannel 对象。 每个ServerSocketChannel 对象都与一个ServerSocket 对象关联。 ServerSocketChannel socket() 方法返回与它关联的 ServerSocket 对象。 可通过以下方法把服务器进程绑定到一个本地端口:

           serverSocketChannel.socket().bind(port)                                                          

       ServerSocketChannel 的主要方法如下:

  • public static ServerSocketChannel open() throws IOException

     这是 ServerSocketChannel 类的静态工厂方法, 它返回一个 ServerSocketChannel 对象, 这个对象没有与任何本地端口绑定, 并且处于阻塞模式。

  • public SocketChannel accept() throws IOException

      类似于 ServerSocket accept() 方法, 用于接收客户的连接。 如果 ServerSocketChannel 处于非阻塞状态, 当没有客户连接时, 该方法立即返回 null 如果ServerSocketChannel 处于阻塞状态, 当没有客户连接时, 它会一直阻塞下去, 直到有客户连接就绪, 或者出现了IOException

      值得注意的是, 该方法返回的 SocketChannel 对象处于阻塞模式, 如果希望把它改为非阻塞模式, 必须执行以下代码:

        socketChannel.configureBlocking(false)                                       

  • public final int validOps()

     返回 ServerSocketChannel 所能产生的事件, 这个方法总是返回 SelectionKey.OP_ACCEPT

  • public ServerSocket socket()

     返回与 ServerSocketChannel 关联的 ServerSocket 对象。 每个 ServerSocketChannel 对象都与一个 ServerSocket 对象关联。

SocketChannel 可看作是 Socket 的替代类, 但它比 Socket 具有更多的功能。 SocketChannel 不仅从 SelectableChannel 父类中继承了 configureBlocking() register() 方法, 并且实现了 ByteChannel 接口, 因此具有用于读写数据的 read(ByteBuffer dst) write(ByteBuffer src) 方法。 SocketChannel 没有public 类型的构造方法, 必须通过它的静态方法open() 来创建 SocketChannel 对象。

Selector 类,只要 ServerSocketChannel SocketChannel Selector 注册了特定的事件, Selector 就会监控这些事件是否发生。 SelectableChannel register() 方法负责注册事件, 该方法返回一个SelectionKey 对象, 该对象是用于跟踪这些被注册事件的句柄。 一个Selector 对象中会包含 3 种类型的 SelectionKey 集合。

  • all-keys 集合: 当前所有向Selector 注册的 SelectionKey 的集合, Selector keys() 方法返回该集合。
  • selected-keys 集合: 相关时间已经被Selector 捕获的SelectionKey 的集合。 Selector selectedKeys() 方法返回该集合。
  • cancelled-keys 集合: 已经被取消的 SelectionKey 的集合。 Selector 没有提供访问这种集合的方法。

扩展:java.util.concurrent包和Future 模式

前一段时间学习了一下线程池技术,其实java本身的这个包可以说已经实现的非常好了。据说是一位牛人:Doug Lee写出来的。写的很好,所以就被sun加入java了。能写出这么优秀的东西,真是不服不行呀。

java.util.concurrent包分成了三个部分,分别是java.util.concurrentjava.util.concurrent.atomicjava.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

Executors通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable<T>的对象。用Threadstart()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。

Lock多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())

使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:

 

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->Lock l = ;   
l.lock();  
try {  
    
// 执行操作  
finally {  
    l.unlock();  
}  

 

java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock

Conditon代替了Object对象上的wait()notify()notifyAll()方法(Condition中提供了await()signal()signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。

AtomicInteger对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。

AtomicInteger为例,提供了代替++ --getAndIncrement()incrementAndGet()getAndDecrement()decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。

CountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是一次性的,如果需要重置计数器,可以使用CyclicBarrier

 

设计模式除了GOF23中经典模式以外,还有很多很实用的模式。比如这个Future模式,在多线程编程中就会常常用到。这个模式对于我们搞金融软件的人就比较容易理解了。炒期货的人也很容易的理解。

举个服务器和客户端交互的例子:

1Server先给Client一个期权,同时开一个线程去干活建房子(未来的现房);

2)当现房”RealData准备好了的时候,如何告诉FutureData说已经准备好了。(采用回调过程(借用观察者模式,来实现回调))

3)如果客户比较着急,现房还没准备好的时候,就要取房,怎么办?那就只能阻塞了。

 

目标:Apache Mina

前面说了这么多,都是为了学习mina做准备。因为mina就是基于java.niojava.util.concurrentFuture模式等的。

Mina的官方介绍:

MINA(Multipurpose Infrastructure for Network Applications)是用于开发高性能和高可用性的网络应用程序的基础框架。通过使用MINA框架可以可以省下处理底层I/O和线程并发等复杂工作,开发人员能够把更多的精力投入到业务设计和开发当中。MINA框架的应用比较广泛,应用的开源项目有Apache DirectoryAsyncWebApache QpidQuickFIX/JOpenfireSubEthaSTMPred5等。MINA框架当前稳定版本是1.1.6,最新的2.0版本目前已经发布了M1版本。 
MINA框架的特点有:基于java NIO类库开发;采用非阻塞方式的异步传输;事件驱动;支持批量数据传输;支持TCPUDP协议;控制反转的设计模式(支持Spring);采用优雅的松耦合架构;可灵活的加载过滤器机制;单元测试更容易实现;可自定义线程的数量,以提高运行于多处理器上的性能;采用回调的方式完成调用,线程的使用更容易。

要理解MINA有一张图片是不能错过的:

要深入学习MINA,有篇教程也是不能错过的:

Mina2.0框架源码剖析.doc

关于MINA,前人已经写了太多的东西了。那么上我的作品吧,在看了上面的教程之后,结合MINA源码,写了一个简单的Http代理程序。

 

 

Code
<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package anotherproxy;

import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoConnector;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class Main {

    
public static void main(String[] args) throws Exception {
        NioSocketAcceptor acceptor 
= new NioSocketAcceptor();
        acceptor.setReuseAddress(
true);
        IoConnector connector 
= new NioSocketConnector();
        connector.setConnectTimeoutMillis(ProxyConstants.CONNECTTIMEOUT);
        ClientToProxyIoHandler handler 
= new ClientToProxyIoHandler(connector);
        acceptor.setHandler(handler);
        acceptor.setBacklog(
100);
        IEProxy.on();
        acceptor.bind(
new InetSocketAddress(ProxyConstants.PROXYPORT));
        System.out.println(
"Listening on port " + ProxyConstants.PROXYPORT);
    }

}

 

Code
<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package anotherproxy;

import java.io.IOException;
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public abstract class AbstractProxyIoHandler extends IoHandlerAdapter {
    
protected static final Charset CHARSET = Charset.forName("utf-8");
    
public static final String OTHER_IO_SESSION = AbstractProxyIoHandler.class.getName()
            
+ ".OtherIoSession";
    @Override
    
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        
if (cause instanceof IOException) {
            
return;
        }
        cause.printStackTrace();
    }
    
    @Override
    
public void sessionOpened(IoSession session) {
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 
5);
    }
    
    @Override
    
public void sessionIdle(IoSession session, IdleStatus status) {
        
if (status == IdleStatus.READER_IDLE) {
            session.close(
true);
        }
    }

    @Override
    
public void sessionClosed(IoSession session) throws Exception {
        
if (session.getAttribute( OTHER_IO_SESSION ) != null) {
            IoSession sess 
= (IoSession) session.getAttribute(OTHER_IO_SESSION);
            sess.close(
false);
            sess.setAttribute(OTHER_IO_SESSION, 
null);
            session.setAttribute(OTHER_IO_SESSION, 
null);
        }
    }

    
}

 

Code
<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package anotherproxy;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;

public class ClientToProxyIoHandler extends AbstractProxyIoHandler {
    
private final ServerToProxyIoHandler connectorHandler = new ServerToProxyIoHandler();

    
private final IoConnector connector;

    
private SocketAddress remoteAddress;

    
public ClientToProxyIoHandler(IoConnector connector) {
        
this.connector = connector;
        
this.connector.setHandler(connectorHandler);
    }

    @Override
    
public void messageReceived(final IoSession session, Object message) throws Exception {
        
final IoBuffer rb = (IoBuffer) message;
        String request 
= rb.getString(CHARSET.newDecoder());
        System.out.println(
"Send*********************************");
        System.out.println(
"session " + session.getId());
        System.out.println(request);
        Pattern pattern 
= Pattern.compile("Host: (.*)");
        Matcher matcher 
= pattern.matcher(request);
        String host;
        
if (matcher.find()) {
            host 
= matcher.group(1);
        } 
else {
            session.close(
true);
            
return;
        }
        remoteAddress 
= new InetSocketAddress(InetAddress.getByName(host), 80);
        connector.connect(remoteAddress).addListener(
new IoFutureListener<ConnectFuture>() {

            @Override
            
public void operationComplete(ConnectFuture future) {
                
if (future.isConnected()) {
                    future.getSession().setAttribute(OTHER_IO_SESSION, session);
                    session.setAttribute(OTHER_IO_SESSION, future.getSession());
                    rb.flip();
                    future.getSession().write(rb);
                    System.out.println(
"server session:" + future.getSession().getId()
                            
+ " client session:" + session.getId());
                }
            }
        });
    }

}

 

Code
<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->package anotherproxy;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;

public class ServerToProxyIoHandler extends AbstractProxyIoHandler {
    
    @Override
    
public void messageReceived(IoSession session, Object message) throws Exception {
        IoBuffer rb 
= (IoBuffer) message;
        IoBuffer wb 
= IoBuffer.allocate(rb.remaining());
        rb.mark();
        wb.put(rb);
        wb.flip();
        WriteFuture writeFuture 
= ((IoSession) session.getAttribute(OTHER_IO_SESSION)).write(wb);
        writeFuture.awaitUninterruptibly();
        System.out.println(
"****************Received*****************");
        System.out.println(
"session " + session.getId());
        System.out.println(wb.getString(CHARSET.newDecoder()));
        rb.reset();
    }
}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics