背景
在项目开发中,一般文件存储很少再使用SFTP服务,但是也不排除合作伙伴使用SFTP来存储项目中的文件或者通过SFTP来实现文件数据的交互。
我遇到的项目中,就有银行和保险公司等合作伙伴通过SFTP服务来实现与我们项目的文件数据的交互。
为了能够顺利地完成与友商的SFTP服务的连通,我们需要在自己的项目中实现一套SFTP客户端工具。一般我们会采用Jsch来实现SFTP客户端。
依赖
<!--执行远程操作--> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.55</version> </dependency> <!--链接池--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency>
首先我们一定要引入jsch
依赖,这个是我们实现SFTP客户端的基石;其次我们引入了链接池工具,为了避免每次执行SFTP命令都要重新创建链接,我们使用池化的方式优化了比较消耗资源的创建操作。
创建工具类
为了更好的使用SFTP工具,我们把jsch
中关于SFTP的相关功能提炼出来,做了一次简单的封装,做成了我们可以直接使用的工具类。
里面只有两类方法:
1.创建Session与开启Session;
session创建好后,还不能创建channel,需要开启session后才能创建channel;
2.创建channel与开启channel;
channel也是一样,创建好的channel需要开启后才能真正地执行命令;
public class JschUtil { /** * 创建session * * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param privateKeyFile 密钥文件 * @param passphrase 口令 * @return * @throws AwesomeException */ public static Session createSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException { return createSession(new JSch(), userName, password, host, port, privateKeyFile, passphrase); } /** * 创建session * * @param jSch * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param privateKeyFile 密钥 * @param passphrase 口令 * @return * @throws AwesomeException */ public static Session createSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException { try { if (!StringUtils.isEmpty(privateKeyFile)) { // 使用密钥验证方式,密钥可以是有口令的密钥,也可以是没有口令的密钥 if (!StringUtils.isEmpty(passphrase)) { jSch.addIdentity(privateKeyFile, passphrase); } else { jSch.addIdentity(privateKeyFile); } } // 获取session Session session = jSch.getSession(userName, host, port); if (!StringUtils.isEmpty(password)) { session.setPassword(password); } // 不校验域名 session.setConfig("StrictHostKeyChecking", "no"); return session; } catch (Exception e) { throw new AwesomeException(500, "create session fail"); } } /** * 创建session * * @param jSch * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @return * @throws AwesomeException */ public static Session createSession(JSch jSch, String userName, String password, String host, int port) throws AwesomeException { return createSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY); } /** * 创建session * * @param jSch * @param userName 用户名 * @param host 域名 * @param port 端口 * @return * @throws AwesomeException */ private Session createSession(JSch jSch, String userName, String host, int port) throws AwesomeException { return createSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY); } /** * 开启session链接 * * @param jSch * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param privateKeyFile 密钥 * @param passphrase 口令 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException { Session session = createSession(jSch, userName, password, host, port, privateKeyFile, passphrase); try { if (timeout >= 0) { session.connect(timeout); } else { session.connect(); } return session; } catch (Exception e) { throw new AwesomeException(500, "session connect fail"); } } /** * 开启session链接 * * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param privateKeyFile 密钥 * @param passphrase 口令 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException { Session session = createSession(userName, password, host, port, privateKeyFile, passphrase); try { if (timeout >= 0) { session.connect(timeout); } else { session.connect(); } return session; } catch (Exception e) { throw new AwesomeException(500, "session connect fail"); } } /** * 开启session链接 * * @param jSch * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(JSch jSch, String userName, String password, String host, int port, int timeout) throws AwesomeException { return openSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout); } /** * 开启session链接 * * @param userName 用户名 * @param password 密码 * @param host 域名 * @param port 端口 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(String userName, String password, String host, int port, int timeout) throws AwesomeException { return openSession(userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout); } /** * 开启session链接 * * @param jSch * @param userName 用户名 * @param host 域名 * @param port 端口 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(JSch jSch, String userName, String host, int port, int timeout) throws AwesomeException { return openSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout); } /** * 开启session链接 * * @param userName 用户名 * @param host 域名 * @param port 端口 * @param timeout 链接超时时间 * @return * @throws AwesomeException */ public static Session openSession(String userName, String host, int port, int timeout) throws AwesomeException { return openSession(userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout); } /** * 创建指定通道 * * @param session * @param channelType * @return * @throws AwesomeException */ public static Channel createChannel(Session session, ChannelType channelType) throws AwesomeException { try { if (!session.isConnected()) { session.connect(); } return session.openChannel(channelType.getValue()); } catch (Exception e) { throw new AwesomeException(500, "open channel fail"); } } /** * 创建sftp通道 * * @param session * @return * @throws AwesomeException */ public static ChannelSftp createSftp(Session session) throws AwesomeException { return (ChannelSftp) createChannel(session, ChannelType.SFTP); } /** * 创建shell通道 * * @param session * @return * @throws AwesomeException */ public static ChannelShell createShell(Session session) throws AwesomeException { return (ChannelShell) createChannel(session, ChannelType.SHELL); } /** * 开启通道 * * @param session * @param channelType * @param timeout * @return * @throws AwesomeException */ public static Channel openChannel(Session session, ChannelType channelType, int timeout) throws AwesomeException { Channel channel = createChannel(session, channelType); try { if (timeout >= 0) { channel.connect(timeout); } else { channel.connect(); } return channel; } catch (Exception e) { throw new AwesomeException(500, "connect channel fail"); } } /** * 开启sftp通道 * * @param session * @param timeout * @return * @throws AwesomeException */ public static ChannelSftp openSftpChannel(Session session, int timeout) throws AwesomeException { return (ChannelSftp) openChannel(session, ChannelType.SFTP, timeout); } /** * 开启shell通道 * * @param session * @param timeout * @return * @throws AwesomeException */ public static ChannelShell openShellChannel(Session session, int timeout) throws AwesomeException { return (ChannelShell) openChannel(session, ChannelType.SHELL, timeout); } enum ChannelType { SESSION("session"), SHELL("shell"), EXEC("exec"), X11("x11"), AGENT_FORWARDING("[email protected]"), DIRECT_TCPIP("direct-tcpip"), FORWARDED_TCPIP("forwarded-tcpip"), SFTP("sftp"), SUBSYSTEM("subsystem"); private final String value; ChannelType(String value) { this.value = value; } public String getValue() { return this.value; } } }
SFTP链接池化
我们通过实现BasePooledObjectFactory
类来池化通道ChannelSftp
。这并不是真正池化的代码,下面的代码只是告知池化管理器如何创建对象和销毁对象。
static class SftpFactory extends BasePooledObjectFactory<ChannelSftp> implements AutoCloseable { private Session session; private SftpProperties properties; // 初始化SftpFactory // 里面主要是创建目标session,后续可用通过这个session不断地创建ChannelSftp。 SftpFactory(SftpProperties properties) throws AwesomeException { this.properties = properties; String username = properties.getUsername(); String password = properties.getPassword(); String host = properties.getHost(); int port = properties.getPort(); String privateKeyFile = properties.getPrivateKeyFile(); String passphrase = properties.getPassphrase(); session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase); } // 销毁对象,主要是销毁ChannelSftp @Override public void destroyObject(PooledObject<ChannelSftp> p) throws Exception { p.getObject().disconnect(); } // 创建对象ChannelSftp @Override public ChannelSftp create() throws Exception { int timeout = properties.getTimeout(); return JschUtil.openSftpChannel(this.session, timeout); } // 包装创建出来的对象 @Override public PooledObject<ChannelSftp> wrap(ChannelSftp channelSftp) { return new DefaultPooledObject<>(channelSftp); } // 验证对象是否可用 @Override public boolean validateObject(PooledObject<ChannelSftp> p) { return p.getObject().isConnected(); } // 销毁资源,关闭session @Override public void close() throws Exception { if (Objects.nonNull(session)) { if (session.isConnected()) { session.disconnect(); } session = null; } } }
为了实现真正的池化操作,我们还需要以下代码:
1.我们需要在SftpClient对象中创建一个GenericObjectPool
对象池,这个才是真正的池子,它负责创建和存储所有的对象。
2.我们还需要提供资源销毁的功能,也就是实现AutoCloseable
,在服务停止时,需要把相关的资源销毁。
public class SftpClient implements AutoCloseable { private SftpFactory sftpFactory; GenericObjectPool<ChannelSftp> objectPool; // 构造方法1 public SftpClient(SftpProperties properties, GenericObjectPoolConfig<ChannelSftp> poolConfig) throws AwesomeException { this.sftpFactory = new SftpFactory(properties); objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig); } // 构造方法2 public SftpClient(SftpProperties properties) throws AwesomeException { this.sftpFactory = new SftpFactory(properties); SftpProperties.PoolConfig config = properties.getPool(); // 默认池化配置 if (Objects.isNull(config)) { objectPool = new GenericObjectPool<>(this.sftpFactory); } else { // 自定义池化配置 GenericObjectPoolConfig<ChannelSftp> poolConfig = new GenericObjectPoolConfig<>(); poolConfig.setMaxIdle(config.getMaxIdle()); poolConfig.setMaxTotal(config.getMaxTotal()); poolConfig.setMinIdle(config.getMinIdle()); poolConfig.setTestOnBorrow(config.isTestOnBorrow()); poolConfig.setTestOnCreate(config.isTestOnCreate()); poolConfig.setTestOnReturn(config.isTestOnReturn()); poolConfig.setTestWhileIdle(config.isTestWhileIdle()); poolConfig.setBlockWhenExhausted(config.isBlockWhenExhausted()); poolConfig.setMaxWait(Duration.ofMillis(config.getMaxWaitMillis())); poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(config.getTimeBetweenEvictionRunsMillis())); objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig); } } // 销毁资源 @Override public void close() throws Exception { // 销毁链接池 if (Objects.nonNull(this.objectPool)) { if (!this.objectPool.isClosed()) { this.objectPool.close(); } } this.objectPool = null; // 销毁sftpFactory if (Objects.nonNull(this.sftpFactory)) { this.sftpFactory.close(); } } }
SFTP链接池的使用
我们已经对链接池进行了初始化,下面我们就可以从链接池中获取我们需要的ChannelSftp
来实现文件的上传下载了。
下面实现了多种文件上传和下载的方式:
1.直接把本地文件上传到SFTP服务器的指定路径;
2.把InputStream输入流提交到SFTP服务器指定路径中;
3.可以针对以上两种上传方式进行进度的监测;
4.把SFTP服务器中的指定文件下载到本地机器上;
5.把SFTP服务器˙中的文件写入指定的输出流;
6.针对以上两种下载方式,监测下载进度;
/** * 上传文件 * * @param srcFilePath * @param targetDir * @param targetFileName * @return * @throws AwesomeException */ public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName) throws AwesomeException { return uploadFile(srcFilePath, targetDir, targetFileName, null); } /** * 上传文件 * * @param srcFilePath * @param targetDir * @param targetFileName * @param monitor * @return * @throws AwesomeException */ public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException { ChannelSftp channelSftp = null; try { // 从链接池获取对象 channelSftp = this.objectPool.borrowObject(); // 如果不存在目标文件夹 if (!exist(channelSftp, targetDir)) { mkdirs(channelSftp, targetDir); } channelSftp.cd(targetDir); // 上传文件 if (Objects.nonNull(monitor)) { channelSftp.put(srcFilePath, targetFileName, monitor); } else { channelSftp.put(srcFilePath, targetFileName); } return true; } catch (Exception e) { throw new AwesomeException(500, "upload file fail"); } finally { if (Objects.nonNull(channelSftp)) { // 返还对象给链接池 this.objectPool.returnObject(channelSftp); } } } /** * 上传文件到目标文件夹 * * @param in * @param targetDir * @param targetFileName * @return * @throws AwesomeException */ public boolean uploadFile(InputStream in, String targetDir, String targetFileName) throws AwesomeException { return uploadFile(in, targetDir, targetFileName, null); } /** * 上传文件,添加进度监视器 * * @param in * @param targetDir * @param targetFileName * @param monitor * @return * @throws AwesomeException */ public boolean uploadFile(InputStream in, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException { ChannelSftp channelSftp = null; try { channelSftp = this.objectPool.borrowObject(); // 如果不存在目标文件夹 if (!exist(channelSftp, targetDir)) { mkdirs(channelSftp, targetDir); } channelSftp.cd(targetDir); if (Objects.nonNull(monitor)) { channelSftp.put(in, targetFileName, monitor); } else { channelSftp.put(in, targetFileName); } return true; } catch (Exception e) { throw new AwesomeException(500, "upload file fail"); } finally { if (Objects.nonNull(channelSftp)) { this.objectPool.returnObject(channelSftp); } } } /** * 下载文件 * * @param remoteFile * @param targetFilePath * @return * @throws AwesomeException */ public boolean downloadFile(String remoteFile, String targetFilePath) throws AwesomeException { return downloadFile(remoteFile, targetFilePath, null); } /** * 下载目标文件到本地 * * @param remoteFile * @param targetFilePath * @return * @throws AwesomeException */ public boolean downloadFile(String remoteFile, String targetFilePath, SftpProgressMonitor monitor) throws AwesomeException { ChannelSftp channelSftp = null; try { channelSftp = this.objectPool.borrowObject(); // 如果不存在目标文件夹 if (!exist(channelSftp, remoteFile)) { // 不用下载了 return false; } File targetFile = new File(targetFilePath); try (FileOutputStream outputStream = new FileOutputStream(targetFile)) { if (Objects.nonNull(monitor)) { channelSftp.get(remoteFile, outputStream, monitor); } else { channelSftp.get(remoteFile, outputStream); } } return true; } catch (Exception e) { throw new AwesomeException(500, "upload file fail"); } finally { if (Objects.nonNull(channelSftp)) { this.objectPool.returnObject(channelSftp); } } } /** * 下载文件 * * @param remoteFile * @param outputStream * @return * @throws AwesomeException */ public boolean downloadFile(String remoteFile, OutputStream outputStream) throws AwesomeException { return downloadFile(remoteFile, outputStream, null); } /** * 下载文件 * * @param remoteFile * @param outputStream * @param monitor * @return * @throws AwesomeException */ public boolean downloadFile(String remoteFile, OutputStream outputStream, SftpProgressMonitor monitor) throws AwesomeException { ChannelSftp channelSftp = null; try { channelSftp = this.objectPool.borrowObject(); // 如果不存在目标文件夹 if (!exist(channelSftp, remoteFile)) { // 不用下载了 return false; } if (Objects.nonNull(monitor)) { channelSftp.get(remoteFile, outputStream, monitor); } else { channelSftp.get(remoteFile, outputStream); } return true; } catch (Exception e) { throw new AwesomeException(500, "upload file fail"); } finally { if (Objects.nonNull(channelSftp)) { this.objectPool.returnObject(channelSftp); } } } /** * 创建文件夹 * * @param channelSftp * @param dir * @return */ protected boolean mkdirs(ChannelSftp channelSftp, String dir) { try { String pwd = channelSftp.pwd(); if (StringUtils.contains(pwd, dir)) { return true; } String relativePath = StringUtils.substringAfter(dir, pwd); String[] dirs = StringUtils.splitByWholeSeparatorPreserveAllTokens(relativePath, "/"); for (String path : dirs) { if (StringUtils.isBlank(path)) { continue; } try { channelSftp.cd(path); } catch (SftpException e) { channelSftp.mkdir(path); channelSftp.cd(path); } } return true; } catch (Exception e) { return false; } } /** * 判断文件夹是否存在 * * @param channelSftp * @param dir * @return */ protected boolean exist(ChannelSftp channelSftp, String dir) { try { channelSftp.lstat(dir); return true; } catch (Exception e) { return false; } }
集成到SpringBoot中
我们可以通过java config
的方式,把我们已经实现好的SftpClient
类实例化到Spring IOC
容器中来管理,以便让开发人员在整个项目中通过@Autowired
的方式就可以直接使用。
配置
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @author zouwei * @className SftpProperties * @date: 2022/8/19 下午12:12 * @description: */ @Data @Configuration @ConfigurationProperties(prefix = "sftp.config") public class SftpProperties { // 用户名 private String username; // 密码 private String password; // 主机名 private String host; // 端口 private int port; // 密钥 private String privateKeyFile; // 口令 private String passphrase; // 通道链接超时时间 private int timeout; // 链接池配置 private PoolConfig pool; @Data public static class PoolConfig { //最大空闲实例数,空闲超过此值将会被销毁淘汰 private int maxIdle; // 最小空闲实例数,对象池将至少保留2个空闲对象 private int minIdle; //最大对象数量,包含借出去的和空闲的 private int maxTotal; //对象池满了,是否阻塞获取(false则借不到直接抛异常) private boolean blockWhenExhausted; // BlockWhenExhausted为true时生效,对象池满了阻塞获取超时,不设置则阻塞获取不超时,也可在borrowObject方法传递第二个参数指定本次的超时时间 private long maxWaitMillis; // 创建对象后是否验证对象,调用objectFactory#validateObject private boolean testOnCreate; // 借用对象后是否验证对象 validateObject private boolean testOnBorrow; // 归还对象后是否验证对象 validateObject private boolean testOnReturn; // 定时检查期间是否验证对象 validateObject private boolean testWhileIdle; //定时检查淘汰多余的对象, 启用单独的线程处理 private long timeBetweenEvictionRunsMillis; //jmx监控,和springboot自带的jmx冲突,可以选择关闭此配置或关闭springboot的jmx配置 private boolean jmxEnabled; } }
java Bean注入
import com.example.awesomespring.exception.AwesomeException; import com.example.awesomespring.sftp.SftpClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author zouwei * @className SftpConfig * @date: 2022/8/19 下午12:12 * @description: */ @Configuration public class SftpConfig { @Autowired private SftpProperties properties; // 创建SftpClient对象 @Bean(destroyMethod = "close") @ConditionalOnProperty(prefix = "sftp.config") public SftpClient sftpClient() throws AwesomeException { return new SftpClient(properties); } }
通过以上代码,我们就可以在项目的任何地方直接使用SFTP客户端来上传和下载文件了。
以上就是SpringBoot怎么集成SFTP客户端实现文件上传下载的详细内容,更多请关注Work网其它相关文章!