高吞吐SFTP连接池设计方案

背景

在现代的数据驱动环境中,安全文件传输协议(SFTP)扮演着至关重要的角色,它提供了一种安全、可靠的文件传输方式。我们目前项目是一个大型数据集成平台,跟上下游有很多文件对接是通过SFTP协议,当需要处理大量文件或进行大规模数据迁移时,我们常常会遇到因上下游服务器的限制导致连接访问被中断或者文件传输中断,大大影响系统的文件传输效率。常见的报错例子 com.jcraft.jsch.JSchException: Session.connect: java.net.SocketException: Connection resetcom.jcraft.jsch.JSchException: connection is closed by foreign hostcom.jcraft.jsch.JSchException: session is downcom.jcraft.jsch.JSchException: channel is not opened 等。这些连接问题也一直困扰着我们,那我们有没有办法在上下游有限的资源去最大提高文件并行处理能力呢?目前网上的连接池方案只是简单解决了SSH Session复用,但是并没有解决同一个SSH Session对象中的SFTP Channel复用问题,所以当文件量大的时候,还是会因为服务器的SSH Session数量限制从而影响文件并行传输效率。最节省资源又能提高文件并行处理方式的做法应该是同时复用SSH Session以及其SFTP Channel,这样最大并行可以处理的文件数就是 SSH Session数*SFTP Channel数,而不是直接每个文件处理线程开启一个SSH Session,只复用SSH Session

问题原因

所有上下游的SFTP服务器都是连接限制,一般来说默认情况下,常见的Linux服务器默认MaxSessions10MaxStartups10:30:60

这里简单对MaxSessionsMaxStartups两个配置参数解释下,这个对我们理解SFTP很重要。

  • MaxStartups 10:30:60:这里指服务器开始时可以进行的最大未经身份验证的连接数是10,当超过第一个数字的连接数时(10),SSHD 将开始随机地拒绝新的连接,在达到第三个数字时(60),SSHD 将拒绝所有新的连接。注意,这里是拒绝未经身份认证的连接数,已经登陆的不会限制,也就是客户端如果同时创建一堆连接等待认证的话,等待的数量超过这个配置就会被拒绝
  • MaxSessions 10: 这里是指每个SSH连接可以创建多少个通信通道,例如JSCH中的ChannelSftp文件处理通道。

如果上下游不对其SFTP服务器做任何设置,那么其服务器最大允许同时并行10个SSH连接(超过10个连接服务器SSHD服务就会随机中断连接,服务开始不稳定),每个SSH连接最多可以打开10SFTP Channel

这里要说下MaxSessionsMaxStartups参数对Java SFTP客户端的影响

以下以目前常用的SFTP框架JSCH为例子。 MaxStartups 和 MaxSessions 是 SSH 守护进程(SSHD)的配置选项,而 JSch 中的 Session 和 ChannelSftp 是 Java 客户端连接到 SSH 服务的对象。它们之间的关系如下:

  • MaxStartups:这是 SSHD 配置的一部分,用于控制并发未经身份验证的连接的数量。这对于防止暴力破解攻击非常有用,因为它限制了同时尝试身份验证的连接数。这个设置对于 JSch 客户端来说,主要影响的是当你尝试并发创建多个 Session 对象(即多个 SSH 连接)时,服务器端可能会开始拒绝额外的连接。
  • MaxSessions:这也是 SSHD 配置的一部分,用于限制每个网络连接(即每个 SSH Session)可以打开的最大会话(channel)数量。这个设置对于 JSch 客户端来说,主要影响的是在一个 Session 对象上你可以创建多少个 ChannelSftp 对象(或其他类型的 Channel 对象)。如果你尝试超过 MaxSessions 的限制创建更多的 Channel,服务器端可能会拒绝新的 Channel 创建请求。

总的来说,MaxStartupsMaxSessions 是服务器端的限制,它们影响了客户端(例如使用 JSch 的应用程序)可以并行创建多少个 SessionChannel。如果我们要设计一个需要处理大量并发 SFTP 传输的系统,需要考虑这些限制,并在必要时调整服务器的 SSHD 配置。

以文件下载为例,这里用现实中的模型简单说下目前我们数据集成平台下载文件的场景
我们数据集成平台需要从不同的上游的下载文件,A、B、C公司相当于我们的上游系统,某东快递相当于我们的数据集成系统。如下图所示,以其中一个快递公司的视角,某东快递需要对接多个商家收件,如果想提高收件速度,最理想就是安排不同的员工并行到不同的商家去取件,这样才可以同时最快收件;而以其中一个商家的视角,A公司要应对不同快递公司的并行取件请求,必然需要对人员有些限制,不然快递员太多可能会影响公司的正常运作,这也就是为什么SFTP服务器需要限制外部连接。

这里用现实生活中的场景来类比我们数据集成平台和上游系统进行文件下载交互的模型,方便大家理解SFTP服务器的限制,以及面对上游服务器的限制怎么最大化我们的传输效率。P.S:以下内容是以我们数据集成平台的视角去陈述。

1对1模型(对接一个上游下载文件)

因为我们目的是提升我们数据集成平台的并行文件传输效率,所以这里是以我们数据集成平台为参照物,假设只跟一个上游交互的场景。(这里为了简单化说明,假设上游服务器每个用户允许最大并发连接是1,打开最大会话数量是2)。

用例说明: 某东快递类比我们的数据集成平台,我们平台要去上游收件,而A公司类比我们要对接的其中一个上游。A公司的门禁类比上游服务器的限制规则。某东快递的快递员类比数据集成平台需要创建的SSH连接,每个快递员可以同时搬运几个货物类比每个SSH连接打开的SFTP Channel数量。


场景:
某东快递需要上门到A公司取件,A公司有三个件在仓库,A公司有以下门禁要求,不满足就会被拒绝访问:
1.同一个时间允许同一家物流公司的3个快递员进入
2.每个工人不能取超过一个货物

某东快递如果想取件时效性高,在快递员充足的情况下,最理想是派3个快递员同时取件,每人拿一个就可以一次拿完。但是由于A公司有门禁要求,3名快递员同时取取件,会有一位快递员会被保安拒绝,造成人员浪费。因此只需要同时派2名快递员,一个人拿2个,另一个人拿一个就可以。这样子就能一次拿完,也不会人员浪费,时效性也高。

但是真实情况,在对接上游时,对方一般都不会告诉我们有什么限制,通常都是被对方拦截才知道,而且快递员也是有成本的。所以我们作为某东快递,如果一直派快递员去A公司取件,除了可能派出去的快递员会被拦截,还会造成人员浪费。因此我们需要一种灵活配置的方式,可以基于对接的上游,灵活分配符合需求的快递员,这样可以在符合上游限制范围内,最大化我们的取件效率。

我们需要有一种可配置的池化技术,除了可以重复利用快递员,还需要在对方限制的范围基于以下原则合理分配:

  1. 并行派出的快递员数<=对方限制同一时间允许相同公司进入的快递员数
  2. 每位快递员并行取件的数量<=对方限制每个快递员单次同时取件的数量

用上面的例子转化成计算机术语,就是需要我们数据集成平台满足以下条件:

  1. 为A上游同一时间不能建立超过2SSH Session
  2. 每个SSH Session最多只能建立一个Sftp Channel

1对多模型(对接多个上游下载文件)

1对多模型是1对1模型的延伸,因为我们目的是提升我们数据集成平台的并行文件传输效率,所以这里是以我们数据集成平台为参照物,假设只跟多个上游交互的场景。(这里为了简单化说明,假设上游服务器每个用户允许最大并发连接是1,打开最大会话数量是2)。

用例说明: 跟以上1对1模型类似,某东快递类比我们的数据集成平台,我们平台要去多个上游收件,而A、B公司类比我们要对接的两个上游。A、B公司的门禁类比上游服务器的限制规则。某东快递的快递员类比数据集成平台需要创建的SSH连接,每个快递员可以同时搬运几个货物类比每个SSH连接打开的SFTP Channel数量。


场景:
某东快递需要上门到A、B公司取件,A、B公司各有三个件在仓库,A、B公司均有以下门禁要求,不满足就会被拒绝访问:
1.同一个时间允许同一家物流公司的3个快递员进入
2.每个工人不能取超过一个货物

跟1对1模型类似,某东快递如果想取件时效性高,在快递员充足的情况下,最理想是同时为两家公司各派3名快递员到A、B公司取件,每人拿一个就可以一次拿完。但是由于A、B公司有门禁要求,3名快递员同时取取件,会有一位快递员会被保安拒绝,造成人员浪费。因此只需要同时各派2名快递员,一个人拿2个,另一个人拿一个就可以。这样子就能一次拿完,也不会人员浪费,时效性也高。

因为这里是一对多模型,跟上面会有一点不一样的地方,我们除了需要上面提到的可配置的池化技术,还需要针对不同上下游做资源隔离,基本原则如下:

  1. 并行派出的快递员数<=对方限制同一时间允许相同公司进入的快递员数
  2. 每位快递员并行取件的数量<=对方限制每个快递员单次同时取件的数量
  3. 同一时间为每个公司分配的快递员是独立的,不会资源竞争

用上面的例子转化成计算机术语,就是需要我们数据集成平台:

  1. 为A、B上游同一时间不能超过2SSH Session
  2. 每个SSH Session最多只能建立一个Sftp Channel
  3. A、B的SSH Session连接池是隔离的,不会互相竞争

解决方案

基于前文提到的思路,我们已经有了可配置连接池的雏形,抽象出来的连接池模型需要符合如下条件:

  1. 为每台对接的SFTP服务器同一时间不能分配超过XSSH Session
  2. 为每台对接的SFTP服务器分配的每个SSH Session最多只能建立YSFTP Channel
  3. 为每台对接的SFTP服务器分配的SSH Session连接池是隔离的,不会互相竞争

以上X代表的我们需要配置的最大SSH Session连接数,Y代表我们需要配置的每个SSH Session最大能创建的Channel数,这两个值都需要根据对接的SFTP服务器的限制和自身服务器情况去调整。

为了避免连接池浪费SSH Session资源,我们还需要做进一步优化,就是要限制每个SSH Session需要创建满YSFTP Channel才创建一个新的SSH Session,而不是每次请求都先创建一个新的SSH Session,这样会导致每个SSH Session都没被充分利用,而且也会增加系统的负担。

优化后的连接池模型如下:

  1. 为每台对接的SFTP服务器同一时间不能分配超过XSSH Session
  2. 为每台对接的SFTP服务器分配的每个SSH Session最多只能建立YSFTP Channel
  3. 为每台对接的SFTP服务器分配的SSH Session连接池是隔离的,不会互相竞争
  4. 多个线程请求获取对某台SFTP服务器SSH Session时,如果当前SSH Session池中的第一个Session的SFTP Channel 数还没超过Y,则会返回同一个SSH Session实例,假如当前的SSH Session池的第一个Session创建的SFTP Channel数已经超过配置的Y值,则创建第二个SSH Session,如此类推,直到创建第N个SSH Session,N <X

对于每个上游分配的连接池请求流程如下:
AI生产那个的流程图

代码实现

SFTP Session Pool

这是一个 SFTP 会话池的实现,它使用了 JSch 库来创建和管理 SFTP 会话(Session)和通道(ChannelSftp)。这个会话池的主要目的是复用已经创建的 SFTP 会话和通道,以提高文件传输的效率。

以下是这个类的主要功能和设计考虑:

  • 会话和通道的创建和管理: 这个类使用 getSession 方法来从池中获取一个会话,如果没有可用的会话,它会创建一个新的会话。然后,它使用 getChannel 方法来从一个会话中打开一个新的 SFTP 通道。

  • 并发控制: 这个类使用了 Semaphore 对象来限制每个主机的最大会话数(maxSessionsPerHostSemaphore)以及每个会话的最大通道数(channelCounts)。这样可以防止过多的并发连接导致系统资源耗尽。

  • 异常处理: 如果在创建会话或打开通道时发生异常,这个类会从池中移除无效的会话,并释放相应的 Semaphore 许可。这样可以确保池中只包含有效的会话和通道。

  • 会话和通道的复用: 如果一个会话的所有通道都已经关闭,这个类会关闭这个会话并从池中移除它。否则,它会将这个会话放回池中,以便后续的请求可以复用它。

  • Key锁: 这是用来防止多个持有相同的(host+port+username)的线程取Session的时候会出现并发,导致创建的Session数比预期多。

  • 会话释放 - 如果当前Session仍有Channel在使用(意味着其他线程在使用该Session),则会放回连接池。否则,会关闭。

这个类的设计提供了一个有效的方式来管理 SFTP 会话和通道,使得它们可以被多个请求复用,从而提高文件传输的效率。

package pool.common.utils;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import pool.exceptions.NoAvailableSessionException;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SftpSessionPool {
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<Session>> sessions;
    private ConcurrentHashMap<Session, Semaphore> channelCounts;
    private int maxChannelsPerSession;
    private Semaphore maxSessionsPerHostSemaphore;
    ReentrantLockPool lock = new ReentrantLockPool();


    public SftpSessionPool(int maxSessionsPerHost, int maxChannelsPerSession) {
        this.sessions = new ConcurrentHashMap<>();
        this.channelCounts = new ConcurrentHashMap<>();
        this.maxChannelsPerSession = maxChannelsPerSession;
        this.maxSessionsPerHostSemaphore = new Semaphore(maxSessionsPerHost, true);
    }

    /**
     * get session with keyLock
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param timeout
     * @param unit
     * @param lockKey
     * @return
     * @throws JSchException
     * @throws InterruptedException
     */
    public Session getSession(String host, int port, String username, String password, long timeout, TimeUnit unit, String lockKey) throws JSchException, InterruptedException {
        String sessionKey = host + ":" + port + ":" + username;
        if (StringUtils.hasLength(lockKey)) {
            // waiting for the available lock
            while (!lock.lock(lockKey)) {
                Thread.sleep(1000); // wait a bit before retrying
                log.debug("Thread {} failed to get lock ", Thread.currentThread().getId());
            }
            log.debug("Thread {} get lock successfully", Thread.currentThread().getId());
        }
        try {
            return getSessionFromPool(host, port, username, password, timeout, unit, sessionKey);
        } finally {
            if (StringUtils.hasLength(lockKey)) {
                lock.unlock(lockKey);
                log.debug("Thread {} release lock successfully", Thread.currentThread().getId());
            }
        }

    }

    private Session getSessionFromPool(String host, int port, String username, String password, long timeout, TimeUnit unit, String sessionKey) throws JSchException, InterruptedException {
        ConcurrentLinkedQueue<Session> hostSessions = sessions.computeIfAbsent(sessionKey, k -> new ConcurrentLinkedQueue<>());
        long endTime = System.nanoTime() + unit.toNanos(timeout);

        while (System.nanoTime() < endTime) {

            for (Session session : hostSessions) {
                if (!session.isConnected()) {
                    // This session is no longer valid, remove it from the pool
                    log.warn("This session is no longer valid, remove it from the pool ,sessionId: {}, session detail: {}", session, sessionKey);
                    hostSessions.remove(session);
                    channelCounts.remove(session);
                    maxSessionsPerHostSemaphore.release();
                    continue;
                }

                Semaphore channelSemaphore = channelCounts.get(session);
                if (channelSemaphore != null && channelSemaphore.tryAcquire()) {
                    return session;
                }
            }

            if (maxSessionsPerHostSemaphore.tryAcquire()) {
                try {
                    Session session = createNewSession(host, port, username, password);
                    hostSessions.add(session);
                    channelCounts.put(session, new Semaphore(maxChannelsPerSession - 1)); // one channel is already in use
                    return session;
                } catch (JSchException e) {
                    maxSessionsPerHostSemaphore.release();
                    throw e;
                }
            }

            Thread.sleep(100); // wait a bit before retrying
        }

        throw new NoAvailableSessionException("Timeout while waiting for a session");
    }


    private Session createNewSession(String host, int port, String username, String password) throws JSchException {
        JSch jsch = new JSch();
        Session session = jsch.getSession(username, host, port);
        session.setPassword(password);
        session.setConfig("StrictHostKeyChecking", "no"); // Enable StrictHostKeyChecking
        session.setTimeout(5000); // Set connection timeout to 5 seconds
        session.connect();
        return session;
    }

    public ChannelSftp getChannel(Session session) throws JSchException {
        try {
            return (ChannelSftp) session.openChannel("sftp");
        } catch (JSchException e) {
            if (!session.isConnected()) {
                // The session is no longer valid, remove it from the pool
                String key = session.getHost() + ":" + session.getUserName();
                ConcurrentLinkedQueue<Session> sessions = this.sessions.get(key);
                sessions.remove(session);
                channelCounts.remove(session);
                maxSessionsPerHostSemaphore.release();
            }
            throw e;
        }

    }


    /**
     * Return the session if current session is till used by another thread ,otherwise close it.
     * @param session
     */
    public synchronized void returnOrCloseSession(Session session) {
        Semaphore channelSemaphore = channelCounts.get(session);
        if (channelSemaphore != null) {
            channelSemaphore.release();
            if (channelSemaphore.availablePermits() < maxChannelsPerSession) {
                // still channels left, put back to the pool
                return;
            }
        }

        // close the session
        session.disconnect();
        channelCounts.remove(session);
        sessions.values().forEach(queue -> queue.remove(session));
        maxSessionsPerHostSemaphore.release();
    }


}

SftpConnectionBuilder

为了对接不同的上游,要实现连接池隔离,这里创建一个Builder给调用者,通过不同的Key(host+port+username)可以复用独立的连接池,这里是因为考虑到有些SFTP Server是多个用户复用的,不同的用户文件传输需求应该是可以并行处理,因此这里连接池要资源隔离。

package pool.common.utils;

import java.util.concurrent.ConcurrentHashMap;

public class SftpConnectionBuilder {

    private static volatile SftpConnectionBuilder instance;
    private ConcurrentHashMap<String, SftpSessionPool> sessionPools;
    private int maxSessionsPerHost;
    private int maxChannelsPerSession;

    private SftpConnectionBuilder(int maxSessionsPerHost, int maxChannelsPerSession) {
        this.sessionPools = new ConcurrentHashMap<>();
        this.maxSessionsPerHost = maxSessionsPerHost;
        this.maxChannelsPerSession = maxChannelsPerSession;
    }

    public static SftpConnectionBuilder getInstance(int maxSessionsPerHost, int maxChannelsPerSession) {
        if (instance == null) {
            synchronized (SftpConnectionBuilder.class) {
                if (instance == null) {
                    instance = new SftpConnectionBuilder(maxSessionsPerHost, maxChannelsPerSession);
                }
            }
        }
        return instance;
    }

    public SftpSessionPool getSessionPool(String host, int port, String username) {
        String key = host + ":" + port + ":" + username;
        return sessionPools.computeIfAbsent(key, k -> new SftpSessionPool(maxSessionsPerHost, maxChannelsPerSession));
    }
}

SftpFileProcessThread

这里创建一个文件处理线程来模拟文件处理,这里获取Session对象时使用host + ":" + username作为锁是防止获取Session时并发导致创建的连接数超过预期。

package pool.demo;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.Session;
import lombok.extern.slf4j.Slf4j;
import pool.common.utils.SftpSessionPool;
import pool.exceptions.NoAvailableSessionException;

import java.util.Vector;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SftpFileProcessThread extends Thread {
    private int maxRetries = 3;
    private SftpSessionPool pool;
    private String host;
    private int port;
    private String username;
    private String password;
    private String remoteSftpPath;

    public SftpFileProcessThread(SftpSessionPool pool, String host, int port, String username, String password, String remoteSftpPath, int maxRetries) {
        this.pool = pool;
        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        this.remoteSftpPath = remoteSftpPath;
        this.maxRetries = maxRetries;
    }

    public void run() {
        Session session = null;
        ChannelSftp channelSftp = null;
        try {
            int retries = 0;
            while (true) {
                try {
                    // session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, "");
                    session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, host + ":" + username); //get session with lock
                    break; // if getSession() is successful, break the loop
                } catch (NoAvailableSessionException e) {
                    if (++retries > this.maxRetries) {
                        throw e; // if exceeded max retries, rethrow the exception
                    }
                    log.info("Thread {}:Failed to get session in this round. Start to retry now. Current retry count is {}. Request session detail: {}", this.getId(), retries, this.host + ":" + this.username);
                    // if not exceeded max retries, sleep for a while and then continue the loop to retry
                    Thread.sleep(5000);
                }
            }
            log.info("Thread {} got session {}. Session detail: {}", this.getId(), session, session.getHost() + ":" + session.getUserName());

            channelSftp = pool.getChannel(session);
            channelSftp.connect();
            // ... use the session and channel
            Vector<ChannelSftp.LsEntry> list = channelSftp.ls(this.remoteSftpPath);
            for (ChannelSftp.LsEntry entry : list) {
                // System.out.println(entry.getFilename());
                //Simulate file process
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            log.error("Thread {} failed to process", this.getId(), e);
        } finally {
            if (channelSftp != null) {
                channelSftp.disconnect();
            }
            if (session != null) {
                pool.returnOrCloseSession(session);
                log.info("Thread {} closed session {}. Session detail: {}", this.getId(), session, this.host + ":" + this.username);
            }
        }
    }
}

SftpService

这里是程序入口,从数据库配置获取连接池配置信息,然后默认开启了10条线程去模拟请求连接池并处理文件任务,后面我们进行测试时,需要改这个线程数来演示。

package pool.services;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import pool.common.utils.SftpConnectionBuilder;
import pool.common.utils.SftpSessionPool;
import pool.dataobject.SftpConfig;
import pool.demo.SftpFileProcessThread;
import pool.repositories.SftpConfigRepository;

import java.util.List;

@Slf4j
@Service
public class SftpService {

    private final SftpConfigRepository sftpConfigRepository;

    public SftpService(SftpConfigRepository sftpConfigRepository) {
        this.sftpConfigRepository = sftpConfigRepository;
    }

    public void initializeConnectionPools() {
        // Load all SFTP configs from the database
        List<SftpConfig> configs = sftpConfigRepository.findAll();
        //Indicate how many files need to be processed in the same time
        int max_concurrent_opening_files = 10;

        String testPath = "/";
        // Initialize a connection pool for each config
        for (SftpConfig config : configs) {
            log.info("config->{}", config);

            SftpSessionPool sessionPool = SftpConnectionBuilder.getInstance(config.getMaxSessions(), config.getMaxChannels()).getSessionPool(config.getHost(), 22, config.getUsername());
            //Simulate each sftp profile is being used by multiple threads for file process
            for (int i = 0; i < max_concurrent_opening_files; i++) {
                SftpFileProcessThread thread = new SftpFileProcessThread(sessionPool, config.getHost(), config.getPort(), config.getUsername(), config.getPassword(), testPath, 0);
                thread.start();
            }
        }
    }
}

SFTP_CONFIG表

这里我创建了一张配置表,用来配置连接池的上限参数,这里JSCH的 MaxSession对应的是服务器MaxStartups参数,JSCH的MaxChannel对应的是服务器的MaxSessions参数

CREATE TABLE SFTP_CONFIG (
    ID INT AUTO_INCREMENT PRIMARY KEY,
    HOST VARCHAR(255),
    PORT INT,
    USERNAME VARCHAR(255),
    PASSWORD VARCHAR(255),
    MAXSESSIONS INT,
    MAXCHANNELS INT
);

测试连接池

1.测试连接池配置上限不超过服务器限制的正常情况

1.1 这里我把SFTP服务器的MaxStartups设置成2MaxSession设置成5。这里把SFTP 的限制设低点是方便测试。

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

1.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =5 与上面服务器配置一致。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.58', 22,'parallels', 'test8808', 2, 5);

1.3 接下来我们来启动程序,会同时并行开启10个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期,10个线程应该只会打开2SFTP Session,并且连接服务器不会报错。

2024-03-09 01:22:03.536  INFO 58516 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:04.072  INFO 58516 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:04.072  INFO 58516 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.075  INFO 58516 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.075  INFO 58516 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 got session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:05.191  INFO 58516 --- [       Thread-9] pool.demo.SftpFileProcessThread          : Thread 41 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.083  INFO 58516 --- [      Thread-11] pool.demo.SftpFileProcessThread          : Thread 43 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.084  INFO 58516 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:06.086  INFO 58516 --- [      Thread-12] pool.demo.SftpFileProcessThread          : Thread 44 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:22:07.084  INFO 58516 --- [      Thread-10] pool.demo.SftpFileProcessThread          : Thread 42 got session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:03.692  INFO 58516 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:04.157  INFO 58516 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:04.160  INFO 58516 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.169  INFO 58516 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.198  INFO 58516 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 closed session com.jcraft.jsch.Session@5cae46b5. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:05.298  INFO 58516 --- [       Thread-9] pool.demo.SftpFileProcessThread          : Thread 41 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.176  INFO 58516 --- [      Thread-11] pool.demo.SftpFileProcessThread          : Thread 43 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.182  INFO 58516 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:06.182  INFO 58516 --- [      Thread-12] pool.demo.SftpFileProcessThread          : Thread 44 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels
2024-03-09 01:24:07.185  INFO 58516 --- [      Thread-10] pool.demo.SftpFileProcessThread          : Thread 42 closed session com.jcraft.jsch.Session@17e2aaef. Session detail: 192.168.50.58:parallels

从以上日志可以看出,10条线程虽然都并行持有相同的SFTP连接信息请求连接池,但是只会获取到2SFTP Session实例而不是10SFTP Session,并且当连接池里的SSH Session持有的Channel还没有达到配置的上限5个时,只会分配同一个SSH Session实例,当前连接池里的SSH Session对象持有的Channel超过5才会分配下一个SFTP Session对象,来确保SFTP Session不会浪费。因此目前连接池是符合我们预期想要的效果。

2.测试连接池配置上限超过服务器Channel限制的异常情况

2.1 服务器配置跟上面一样,MaxStartups设置成2MaxSession设置成5

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

2.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =1 ,目的是测试当连接池配置的最大Channel数大于服务器限制时,程序多个线程并行进行文件处理会不会报错。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.57', 22,'parallels', 'test8808', 1, 6);

2.3 接下来我们来启动程序,会同时并行开启6个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期。如果程序符合预期的话,6个线程会同时使用1个SSH Session去尝试打开6个Channel,第6个线程开始打开Channel进行文件处理的时候应该会开始报错,因为已经超过了服务器所能承受的 MaxSession=5

2024-03-09 15:30:16.725  INFO 74635 --- [           main] pool.services.SftpService                : config->SftpConfig(id=1, host=192.168.50.57, port=22, username=parallels, password=test8808, maxSessions=1, maxChannels=6)
2024-03-09 15:30:17.180  INFO 74635 --- [       Thread-6] pool.demo.SftpFileProcessThread          : Thread 38 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.257  INFO 74635 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.257  INFO 74635 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.259  INFO 74635 --- [       Thread-8] pool.demo.SftpFileProcessThread          : Thread 40 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.264  INFO 74635 --- [       Thread-7] pool.demo.SftpFileProcessThread          : Thread 39 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.268  INFO 74635 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@63c4e36c. Session detail: 192.168.50.57:parallels
2024-03-09 15:30:17.278 ERROR 74635 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 failed to process

com.jcraft.jsch.JSchException: channel is not opened.
	at com.jcraft.jsch.Channel.sendChannelOpen(Channel.java:835) ~[jsch-0.2.16.jar:0.2.16]
	at com.jcraft.jsch.Channel.connect(Channel.java:161) ~[jsch-0.2.16.jar:0.2.16]
	at com.jcraft.jsch.Channel.connect(Channel.java:155) ~[jsch-0.2.16.jar:0.2.16]
	at pool.demo.SftpFileProcessThread.run(SftpFileProcessThread.java:54) ~[classes/:na]

从上面日志可以看到,当超过服务器最大Channel配置5时,服务器就会开始拒绝服务,如何我们测试预期。因此我们连接池的配置范围一定要小于等于服务器的配置,否则可能会引起文件传输中断,也就是宁愿少配也不要多配

3.测试连接池配置上限超过服务器SSH Session限制的异常情况

3.1 服务器配置跟上面一样,MaxStartups设置成2MaxSession设置成5

parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxStartups" /etc/ssh/sshd_config
MaxStartups 2
parallels@ubuntu-linux-22-04-desktop:~$ grep -i "MaxSessions" /etc/ssh/sshd_config
MaxSessions 5
parallels@ubuntu-linux-22-04-desktop:~$ 

3.2 把连接池的数据库配置改成MaxSession=3,MaxChannel =1 ,目的测试当连接池配置的最大Session数大于服务器限制时,程序多个线程并发创建SSH Session会不会出错。

INSERT INTO SFTP_CONFIG (HOST, PORT, USERNAME, PASSWORD, MAXSESSIONS, MAXCHANNELS) VALUES
('192.168.50.57', 22,'parallels', 'test8808', 3, 1);

3.3 修改SftpFileProcessThread代码中获取连接方式改成无锁方式,这样可以看到并发创建Session数量超过服务器限制时服务器的报错。因为MaxStartups上面我们配置了2,需要同时多个线程模拟并发创建超过2个等待验证的Session才能看到效果,如果使用有锁模式,创建Session就会变成串行,验证通过才会到下一个Session,服务器就不会有多个等待的验证的Session

 session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, "");
 //session = pool.getSession(host, port, username, password, 10, TimeUnit.SECONDS, host + ":" + username); //get session with lock

3.4 接下来我们来启动程序,会同时并行开启3个线程持有相同的连接信息去访问同一台SFTP 服务器,我们看看程序是否会符合我们预期。如果程序符合预期的话,3个线程会同时创建3个SSH Session,第3个线程尝试创建SSH Session 时应该会报错,因为超过了服务器最大 并发Session数限制MaxStartups=2

2024-03-09 15:55:23.367  INFO 7577 --- [           main] pool.services.SftpService                : config->SftpConfig(id=1, host=192.168.50.57, port=22, username=parallels, password=test8808, maxSessions=3, maxChannels=1)
2024-03-09 15:55:23.455 ERROR 7577 --- [       Thread-5] pool.demo.SftpFileProcessThread          : Thread 37 failed to process

com.jcraft.jsch.JSchException: Session.connect: java.net.SocketException: Connection reset
	at com.jcraft.jsch.Session.connect(Session.java:570) ~[jsch-0.2.16.jar:0.2.16]
	at com.jcraft.jsch.Session.connect(Session.java:199) ~[jsch-0.2.16.jar:0.2.16]
	at pool.common.utils.SftpSessionPool.createNewSession(SftpSessionPool.java:132) ~[classes/:na]
	at pool.common.utils.SftpSessionPool.getSessionFromPool(SftpSessionPool.java:109) ~[classes/:na]
	at pool.common.utils.SftpSessionPool.getSession(SftpSessionPool.java:57) ~[classes/:na]
	at pool.demo.SftpFileProcessThread.run(SftpFileProcessThread.java:39) ~[classes/:na]
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_231]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_231]
	at java.net.SocketInputStream.read(SocketInputStream.java:224) ~[na:1.8.0_231]
	at com.jcraft.jsch.IO.getByte(IO.java:84) ~[jsch-0.2.16.jar:0.2.16]
	at com.jcraft.jsch.Session.connect(Session.java:276) ~[jsch-0.2.16.jar:0.2.16]
	... 5 common frames omitted

2024-03-09 15:55:23.900  INFO 7577 --- [       Thread-3] pool.demo.SftpFileProcessThread          : Thread 35 got session com.jcraft.jsch.Session@4777f0b1. Session detail: 192.168.50.57:parallels
2024-03-09 15:55:23.900  INFO 7577 --- [       Thread-4] pool.demo.SftpFileProcessThread          : Thread 36 got session com.jcraft.jsch.Session@326d5e63. Session detail: 192.168.50.57:parallels

从上面日志可以看到,3个线程同一时间只有2个线程能成功创建Session,另外一个线程创建Session就报错,这是因为超过了服务器最大并发Session数限制而被拒绝访问。

总结

我们如果使用这种可配置连接池进行访问,对接上游时最好时最好跟上游确认他们服务器可以承受的Session数量和Channel数量是多少,宁愿少配也不要多配。但是对于一般上游,如果使用的是Linux服务器,默认值就是上面一开始提到的MaxSessions=10,MaxStartups=10:30:60,我们客户端连接池保守配置可以设置成MaxSession=3,MaxChannel=10,这样子最大并行可以处理30个文件,MaxSession一般不建议设置太多,非常消耗系统资源。也有很多上游是使用其他SFTP服务管理工具,但是基本限制参数差不多。以上连接池还有很多需要优化的地方,大家可以根据需要自己进行优化。

Github源代码

https://github.com/EvanLeung08/sftp-session-pool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/444379.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何修复advapi32.dll丢失无法启动程序的问题

如果你在运行Windows程序时遇到了“advapi32.dll丢失无法启动程序”的错误消息&#xff0c;那么这意味着你的计算机缺少这个DLL文件。在本文中&#xff0c;我们将提供一些解决方案&#xff0c;帮助你解决这个问题并恢复计算机的正常运行。 一.advapi32.dll丢失电脑的提示 关于…

C语言编译成库文件的要求

keil编译成库文件 在Keil中&#xff0c;将C语言源文件编译成库文件通常需要进行以下步骤&#xff1a; 创建一个新的Keil项目&#xff0c;并将所需的C语言源文件添加到该项目中。 在项目设置中配置编译选项&#xff0c;确保生成的目标文件符合库文件的标准格式。 编译项目&…

React Three Fiber快速入门

https://threejs-journey.com/lessons/what-are-react-and-react-three-fiber#学习笔记 1.基础知识 resize 填充模版 构建第一个场景 we didn’t have to create a scenewe didn’t have to create the webglrenderthe scene is being rendered on each framethe default…

C++字符串操作【超详细】

零.前言 本文将重点围绕C的字符串来展开描述。 其中&#xff0c;对于C/C中字符串的一些区别也做出了回答&#xff0c;并对于C的&#xff08;string库&#xff09;进行了讲解&#xff0c;最后我们给出字符串的不同表达形式。 开发环境&#xff1a; VS2022 一.字符串常量跟字…

leetcode 热题 100_相交链表

题解一&#xff1a; 哈希表&#xff1a;两链表出现的第一个相同的值就是相交节点&#xff0c;因此我们先用哈希记录链表A所有出现过的值&#xff0c;再遍历链表B查找哈希表&#xff0c;找出第一个相同的值即为结果。 import java.util.HashSet;public class Solution {public …

如何使用Hexo搭建个人博客

文章目录 如何使用Hexo搭建个人博客环境搭建连接 Github创建 Github Pages 仓库本地安装 Hexo 博客程序安装 HexoHexo 初始化和本地预览 部署 Hexo 到 GitHub Pages开始使用发布文章网站设置更换主题常用命令 插件安装解决成功上传github但是web不更新不想上传文章处理方式链接…

【解读】OWASP 大语言模型(LLM)安全测评基准V1.0

大语言模型&#xff08;LLM&#xff0c;Large Language Model&#xff09;是指参数量巨大、能够处理海量数据的模型, 此类模型通常具有大规模的参数&#xff0c;使得它们能够处理更复杂的问题&#xff0c;并学习更广泛的知识。自2022 年以来&#xff0c;LLM技术在得到了广泛的应…

ReactNative项目构建分析与思考之react-native-gradle-plugin

前一段时间由于业务需要&#xff0c;接触了下React Native相关的知识&#xff0c;以一个Android开发者的视角&#xff0c;对React Native 项目组织和构建流程有了一些粗浅的认识&#xff0c;同时也对RN混合开发项目如何搭建又了一点小小的思考。 RN环境搭建 RN文档提供了两种…

多维时序 | Matlab实现BiGRU-Mutilhead-Attention双向门控循环单元融合多头注意力机制多变量时序预测

多维时序 | Matlab实现BiGRU-Mutilhead-Attention双向门控循环单元融合多头注意力机制多变量时序预测 目录 多维时序 | Matlab实现BiGRU-Mutilhead-Attention双向门控循环单元融合多头注意力机制多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.多维时序 …

字典Trie树

字典树 : 概念 建字典树 查询 : 代码模板 : const int N100010; int n; char s[N]; int ch[N][26],cnt[N],idx;void insert(char *s){int p0;for(int i0; s[i]; i ){int js[i]-a;//字母映射if(!ch[p][j])ch[p][j]idx;pch[p][j];}cnt[p];//插入次数 } int query(char *s){i…

线上应用部署了两台load为1四核服务器

线上应用部署了两台服务器。 项目发布后&#xff0c;我对线上服务器的性能进行了跟踪&#xff0c;发现一台负载为3&#xff0c;另一台负载为1&#xff0c;其中一台四核服务器已经快到瓶颈了&#xff0c;所以我们紧急排查原因。 1、使用TOP命令查看占用CPU较大的负载和进程&…

《C语言都有哪些字符串处理函数?》

目录 17个字符串处理函数 1. gets()--读 2.fgets()--从指定文件内读 3.puts()--输出 4.fputs()--写入到指定文件中 5.strlen()--计算字符串长度 6.strcpy()--复制 7.strncpy()--复制前n个字符 8.strcat()--字符串连接 9.strncat()--将前n个字符连接 10.strcmp()--比…

Flink概述

1.什么是Flink 是一个框架和分布式处理引擎&#xff0c;用于对无界和有界数据流进行有状态计算。 官网&#xff1a;Flink 2.Flink的发展历史 Flink起源于一个叫作Stratosphere的项目&#xff0c;它是由3所地处柏林的大学和欧洲其他一些大学在2010~2014年共同进行的研究项目&a…

从零开始:神经网络(2)——MP模型

声明&#xff1a;本文章是根据网上资料&#xff0c;加上自己整理和理解而成&#xff0c;仅为记录自己学习的点点滴滴。可能有错误&#xff0c;欢迎大家指正。 神经元相关知识&#xff0c;详见从零开始&#xff1a;神经网络——神经元和梯度下降-CSDN博客 1、什么是M-P 模型 人…

物联网云原生云边协同

文章目录 一、物联网平台设计1.物联网平台设计2.物联网平台实现 二、部署环境1.节点配置2.版本信息 三、物联网平台部署1.部署 Kubernetes 集群2.部署 KubeEdge3.部署 ThingsBoard 集群4.部署 ThingsBoard Edge4.1.创建 Edge 实例4.2.部署 PostgreSQL4.3.创建数据库4.4.部署 Th…

构建高效可靠的消息队列系统:设计与实现

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 一、引言 二、设计目标 2.1、高可用性 1. 集群搭建 1.1 …

MACBOOK PRO M2 MAX 安装Stable Diffusion及文生图实例

以前偶尔会使用Midjourney生成一些图片&#xff0c;现在使用的头像就是当时花钱在Midjourney上生成的。前段时间从某鱼上拍了一台性价比还不错的macbook&#xff0c;想着不如自己部署Stable Diffusion&#xff08;以下简称SD&#xff09;尝试一下。 网上有很多教程&#xff0c…

数组的内存执行原理

一.Java内存分配介绍 JVM虚拟机会在内存中执行程序 java内存分配介绍 方法区&#xff0c;栈&#xff0c;堆 首先将编译过后的.class文件送入方法区中。当类开始运行时将方法调入栈内存中&#xff0c;变量也是属于方法的&#xff0c;因此同方法一起进入栈内存中。当main方法要…

日期问题---算法精讲

前言 今天讲讲日期问题&#xff0c;所谓日期问题&#xff0c;在蓝桥杯中出现众多&#xff0c;但是解法比较固定。 一般有判断日期合法性&#xff0c;判断是否闰年&#xff0c;判断日期的特殊形式&#xff08;回文或abababab型等&#xff09; 目录 例题 题2 题三 总结 …

人工智能|机器学习——K-means系列聚类算法k-means/ k-modes/ k-prototypes/ ......(划分聚类)

1.k-means聚类 1.1.算法简介 K-Means算法又称K均值算法&#xff0c;属于聚类&#xff08;clustering&#xff09;算法的一种&#xff0c;是应用最广泛的聚类算法之一。所谓聚类&#xff0c;即根据相似性原则&#xff0c;将具有较高相似度的数据对象划分至同一类簇&#xff0c;…
最新文章