`

commons-pool连接池的实现应用

阅读更多

一、描述: 

apache commons-pool本质上是"对象池",即通过一定的规则来维护对象集合的容器;commons-pool用来实现"连接池"或"任务池"等,dbcp也是基于commons-pool实现。

二、commons-pool实现思路:

将"对象集合"池化,任何通过pool进行对象存取的操作,都会严格按照"pool配置"试试的创建对象、阻塞控制、销毁对象等。实现了对象集合的管理以及对象的分发。

a.将创建对象的方式,使用工厂模式;

b.通过"pool配置"来约束对象存取的时机

c.将对象列表保存在队列中(LinkedList)

三、实例演示(FTP连接池管理):吐舌头

public abstract class AbstractObjectPool<T>{

   private final GenericObjectPool<T> internalPool;

   public AbstractObjectPool(GenericObjectPool.Config poolConfig,                             PoolableObjectFactory factory){
        this.internalPool=new GenericObjectPool(factory,poolConfig);
  }

   public T getResource(){
      return this.internalPool.borrowObject();
   }
   
   public void returnResource(T  resource){
       this.internalPool.returnObject(resource);
  }
   
   public void destroy(){
       this.internalPool.close();
   }

}

 FTPPool.java继承了抽象类AbstractObjectPool.java,在构造方法中实例化FTPPoolableObjectFactory,处理GenericObjectPool中的操作。

public class FTPPool extends<AbstractObjectPool>{

   public FTPPool(GenericObjectPool.Config poolConfig,String host,int port,String user,String password,String passiveModeConf){
         
     super(poolConfig,new FTPPoolableObjectFactort(host, port, user, password, passiveModeConf));
   }

}

 FTPPoolableObjectFactort.java继承BasePoolableObjectFactory,其中的方法分别为:

GenericObjectPool内部会回调makeObject创建对象

GenericObjectPool内部会回调destroyObject销毁对象

GenericObjectPool内部会回调validateObject检验对象

public class FTPPoolableObjectFactory extends BasePoolableObjectFactory<FTPClient>{
   private String host;  
   private int port;  
   private String user;  
   private String password;  
   private String passiveModeConf;

   public FTPPoolableObjectFactory (String host,int port,String    user,String password,String passiveModeConf){
    this.host=host;
    this.port = port;
    this.user = user;
    this.password = password;
    this.passiveModeConf = passiveModeConf; 
  }

   @Override
    public FTPClient makeObject() throws Exception {
        FTPClient ftpClient = getFTPClient();
        return ftpClient;
    }
   
   @Override
    public void destroyObject(FTPClient client) throws Exception {
        if (client != null) {
            disconnect(client);
        }
    }

    @Override
    public boolean validateObject(FTPClient obj) {
        if (obj != null) {
            FTPClient ftpClient = obj;
            try {
                return ftpClient.isConnected();
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                return false;
            }
        }
        return false;
    }
    
   private void setFileType(FTPClient ftpClient) throws FTPClientException {
        try {
            if (binaryTransfer) {
                ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
            } else {
                ftpClient.setFileType(FTPClient.ASCII_FILE_TYPE);
            }
        } catch (IOException e) {
            throw new FTPClientException("Could not to set file type.", e);
        }
        msg = " setFileType(FTPClient " + ftpClient + ") 结束";
        LOGGER.error(msg);
    }

    private FTPClient getFTPClient() throws FTPClientException {
        FTPClient ftpClient = new FTPClient(); // 构造一个FtpClient实例
        ftpClient.setControlEncoding(encoding); // 设置字符集        
        connect(ftpClient); // 连接到ftp服务器
        // 设置为passive模式
        if (passiveMode) {
            ftpClient.enterLocalPassiveMode();
        }
        setFileType(ftpClient); // 设置文件传输类型
        try {
            ftpClient.setSoTimeout(CLIENT_TIMEOUT);
        } catch (SocketException e) {
            throw new FTPClientException("Set timeout error.", e);
        }
        return ftpClient;
    }
    
 private boolean connect(FTPClient ftpClient) throws FTPClientException {
        try {
            ftpClient.connect(host, port);           
            // 连接后检测返回码来校验连接是否成功
            int reply = ftpClient.getReplyCode();
            
            if (FTPReply.isPositiveCompletion(reply)) {
                // 登陆到ftp服务器
                msg = "login(" + username + ", " + password + ")";
                if (ftpClient.login(username, password)) {
                    setFileType(ftpClient);
                    return true;
                }
            } else {
                ftpClient.disconnect();
                throw new FTPClientException("FTP server refused connection.");
            }
        } catch (IOException e) {
            if (ftpClient.isConnected()) {
                try {
                    ftpClient.disconnect(); // 断开连接
                } catch (IOException e1) {
                    throw new FTPClientException("Could not disconnect from server.", e);
                }
                
            }
            throw new FTPClientException("Could not connect to server.", e);
        }
        return false;
    }
   
    private void disconnect(FTPClient ftpClient) throws FTPClientException {
        try {
            ftpClient.logout();
            if (ftpClient.isConnected()) {
                ftpClient.disconnect();
            }
        } catch (IOException e) {
            throw new FTPClientException("Could not disconnect from server.", e);
        }
    }
}

 客户端测试方法:

public static void main(String[] args) throws Exception{
   //设置连接池的配置参数
  GenericObjectPool.Config config = new Config();
   //最大池容量  
   config.maxActive=5;

   //从池中取对象达到最大时,继续创建新对象. 
   config.whenExhaustedAction =    GenericObjectPool.WHEN_EXHAUSTED_GROW;

   //池为空时取对象等待的最大毫秒数
   config.maxWait=60*1000; 

   //取出对象时验证(此处设置成验证ftp是否处于连接状态)
   config.testOnBorrow=true; 

    //还回对象时验证(此处设置成验证ftp是否处于连接状态)
   config.testOnReturn=true;

   FTPPool pool = new FTPPool(config,"XXXXXX",21,"xxxxxx","xxxxxx","true");
  
  System.out.println("borrowSize1:"+pool.borrowSize());
  System.out.println("inPoolSize1:"+pool.inPoolSize());
  long begin=System.currentTimeMillis();

  for(int i=0;i<8;i++){
     FTPClient ftpClient = pool.getResource();
     System.out.println("ftpClient"+(i+1)+" isConnected:"+ftpClient.isConnected());

     pool.returnResource(ftpClient);
  }
  
  System.out.println("time:"+(System.currentTimeMillis()-begin)); 
  System.out.println("borrowSize2:"+pool.borrowSize());
  System.out.println("inPoolSize2:"+pool.inPoolSize());
  pool.destroy();
}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics