srs集群下行edge处理逻辑

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{
    srs_error_t err = srs_success;
    
    consumer = new SrsLiveConsumer(this, conn);
    consumers.push_back(consumer);
    if (conn != NULL) {
        conn->srsConsumer = consumer;
    }

    // There should be one consumer, so reset the timeout.
    stream_die_at_ = 0;
    publisher_idle_at_ = 0;
    //通过配置文件中的参数,判断是否是边缘服务器
    //如果是边缘服务器,则调用 play_edge进行拉流播放
    //SrsPlayEdge* play_edge;
    // for edge, when play edge stream, check the state
    if (_srs_config->get_vhost_is_edge(req->vhost)) {
        // notice edge to start for the first client.
        if ((err = play_edge->on_client_play()) != srs_success) {
            return srs_error_wrap(err, "play edge");
        }
    }
    
    return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{
    srs_error_t err = srs_success;
    //SrsEdgeIngester ingester 启动一个新的协程去源站拉流
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        err = ingester->start();
    } else if (state == SrsEdgeStateIngestStopping) {
        return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");
    }

    
    return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{
    srs_error_t err = srs_success;
    
    if ((err = source->on_publish()) != srs_success) {
        return srs_error_wrap(err, "notify source");
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("edge-igs", this);
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }
    
    return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{
     while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "do cycle pull");
        }

        // Use protocol in config.
        string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);

        // If follow client protocol, change to protocol of client.
        bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);
        if (follow_client && !req->protocol.empty()) {
            edge_protocol = req->protocol;
        }

        // Create object by protocol.
        srs_freep(upstream);
        //根据边缘协议创建对应的拉流类
        if (edge_protocol == "flv" || edge_protocol == "flvs") {
            upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");
        } else {
            upstream = new SrsEdgeRtmpUpstream(redirect);
        }
        
        if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
            return srs_error_wrap(err, "on source id changed");
        }
        //边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取
        //其中一个节点进行拉流
        //这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?
        //其实如果发现连接的源站没有流,会触发302 redirect重连逻辑
        if ((err = upstream->connect(req, lb)) != srs_success) {
            return srs_error_wrap(err, "connect upstream");
        }
        
        if ((err = edge->on_ingest_play()) != srs_success) {
            return srs_error_wrap(err, "notify edge play");
        }

        // set to larger timeout to read av data from origin.
        upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
        //拉流处理函数
        err = ingest(redirect);
        
        if (srs_is_client_gracefully_close(err)) {
            srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
            srs_error_reset(err);
        }
        break;
    }
    
}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{
    //第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台
    //如果连接的源站没有流,触发302,再连接另一台
     if (redirect_depth == 0) {
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);

        // @see https://github.com/ossrs/srs/issues/79
        // when origin is error, for instance, server is shutdown,
        // then user remove the vhost then reload, the conf is empty.
        if (!conf) {
            return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());
        }

        // select the origin.
        std::string server = lb->select(conf->args);
        int port = SRS_DEFAULT_HTTP_PORT;
        if (schema_ == "https") {
            port = SRS_DEFAULT_HTTPS_PORT;
        }
        srs_parse_hostport(server, server, port);

        // Remember the current selected server.
        selected_ip = server;
        selected_port = port;
    } else {
        // If HTTP redirect, use the server in location.
        schema_ = req->schema;
        selected_ip = req->host;
        selected_port = req->port;
    }
    
    sdk_ = new SrsHttpClient();
    if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {
        return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
  
    //如果状态码为302,开启重连另一台逻辑
    string location;
    if (hr_->status_code() == 302) {
        //获取302返回的地址
        location = hr_->header()->get("Location");
    }
    srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());

    if (hr_->status_code() == 302) {
        //最多重试三次
        if (redirect_depth >= 3) {
            return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);
        }

        string app;
        string stream_name;
        if (true) {
            string tcUrl;
            srs_parse_rtmp_url(location, tcUrl, stream_name);

            int port;
            string schema, host, vhost, param;
            srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);

            r->schema = schema; r->host = host; r->port = port;
            r->app = app; r->stream = stream_name; r->param = param;
        }
        //重连
        return do_connect(r, lb, redirect_depth + 1);
    }
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "thread quit");
        }
        
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
        }
        
        // read from client.
        SrsCommonMessage* msg = NULL;
        //upstream拉流
        if ((err = upstream->recv_message(&msg)) != srs_success) {
            return srs_error_wrap(err, "recv message");
        }
        
        srs_assert(msg);
        SrsAutoFree(SrsCommonMessage, msg);
        //处理拉到的流
        if ((err = process_publish_message(msg, redirect)) != srs_success) {
            return srs_error_wrap(err, "process message");
        }
    }
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
    srs_error_t err = srs_success;
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume audio");
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume video");
        }
    }
   }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/400050.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【保真】揭秘目前唯一能使用Sora的官方渠道 —— OpenAI Red Teaming Network

原文链接:【保真】揭秘目前唯一能使用Sora的官方渠道 —— OpenAI Red Teaming Network 前几天OpenAI推出的Sora模型着实太火了,不仅让圈内人热血封腾,也给圈外人点了一把AGI的热情之火。 Sora的大火,也有不少小伙伴开始问一个问…

2.网络游戏逆向分析与漏洞攻防-游戏启动流程漏洞-项目搭建

内容参考于:易道云信息技术研究院VIP课 上一个内容:1.网络游戏逆向分析与漏洞攻防-游戏启动流程漏洞-测试需求与需求拆解-CSDN博客 代码以提交到码云:https://gitee.com/dye_your_fingers/titan 开始搭建环境 创建之前:HOOK引擎…

集成厂家服务的核心内容

一、关系定义 集成商是指将多个不同的产品或服务集成在一起,形成一个整体解决方案,满足客户需求的企业或个人。 厂家产品商是指生产和销售产品的企业或个人,他们制造和提供各种产品,供集成商使用。 客户关系是指集成商和厂家产…

爬虫在网页抓取的过程中可能会遇到哪些问题?

在网页抓取(爬虫)过程中,开发者可能会遇到多种问题,以下是一些常见问题及其解决方案: 1. IP封锁: 问题:封IP是最常见的问题,抓取的目标网站会识别并封锁频繁请求的IP地址。 解决方案…

git push 使用 --mirror 参数复制仓库

迁移一个 Git 仓库并且保留原有的提交记录和分支 克隆原始仓库到本地 git clone <原始仓库URL> <新仓库目录>添加新的远程仓库&#xff1a;git remote add new-origin <新仓库URL>推送所有分支和标签到新的远程仓库&#xff1a;git push new-origin --mirro…

Vue封装全局公共方法

有的时候,我们需要在多个组件里调用一个公共方法,这样我们就能将这个方法封装成全局的公共方法。 我们先在src下的assets里新建一个js文件夹,然后建一个common.js的文件,如下图所示: 然后在common.js里写我们的公共方法,比如这里我们写了一个testLink的方法,然后在main…

计算机设计大赛 深度学习动物识别 - 卷积神经网络 机器视觉 图像识别

文章目录 0 前言1 背景2 算法原理2.1 动物识别方法概况2.2 常用的网络模型2.2.1 B-CNN2.2.2 SSD 3 SSD动物目标检测流程4 实现效果5 部分相关代码5.1 数据预处理5.2 构建卷积神经网络5.3 tensorflow计算图可视化5.4 网络模型训练5.5 对猫狗图像进行2分类 6 最后 0 前言 &#…

微信小程序uniapp校园租房指南房屋租赁系统java+python+nodejs+php

语言&#xff1a;javapythonnodejsphp均支持 框架支持:Ssm/django/flask/thinkphp/springboot/express均支持 运行软件:idea/eclipse/vscode/pycharm/wamp均支持 数据库 mysql 数据库工具&#xff1a;Navicat等 前端开发:vue 小程序端运行软件 微信开发者工具/hbuiderx uni-…

金三银四即将到来,该准备简历和面试了!

一直以来找讲师帮忙看简历的人很多&#xff0c;但是很少会有人问讲师&#xff1a;根据简历该如何准备面试&#xff1f; 还有一些人简历是达标的&#xff0c;但是面试不通过的&#xff0c;却简单地认为是简历问题&#xff0c;不会认为是自己的掌握问题。 一年一度的金三银四即…

使用AndroidStudio调试Framework

1.前言 最近在工作过程中&#xff0c;涉及到FW的一些修改&#xff0c;比如PhoneWindowManager&#xff0c;只能通过加日志看打印的方式查看一些内容&#xff0c;比较低效&#xff0c;所以想了解一下FW的调试方式&#xff0c;后来发现AS就可以调试FW.我平时都是在Docker服务器编…

linux ext3/ext4文件系统(part2 jbd2)

概述 jbd2&#xff08;journal block device 2&#xff09;是为块存储设计的 wal 机制&#xff0c;它为要写设备的buffer绑定了一个journal_head&#xff0c;这个journal_head与一个transaction绑定&#xff0c;随着事务状态的转移&#xff08;运行&#xff0c;生成日志&#…

Kubernetes kubeadm 证书到期,更新证书

1.环境说明 lient Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.6", GitCommit:"fbf646b339dc52336b55d8ec85c181981b86331a", GitTreeState:"clean", BuildDate:"2020-12-18T12:09:30Z", G…

多目图像拼接算法

图像拼接一般要经过图像特征提取、特征匹配、融合等步骤来实现。 特征匹配与变换: SIFT(尺度不变特征变换)SURF(加速鲁棒特征)ORB(Oriented FAST and Rotated BRIEF)AKAZE(加速的KAZE特征)全景图像拼接算法: 基于特征匹配的拼接:利用特征点匹配找到重叠区域,然后进…

【C++】初始化列表、static成员、友元、匿名对象、附练习题

文章目录 前言一、构造函数【初始化列表】1.1 构造函数体赋值1.2 初始化列表1.3 explicit关键字 二、static成员2.1 概念2.2 特性 三、友元3.1 友元函数3.2 内部类 四、匿名对象4.1 拷贝对象时的一些编译器优化 五、再次理解类和对象六、练习题6.1 求123...n&#xff0c;要求不…

读书笔记-增强型分析:AI驱动的数据分析、业务决策与案例实践

目录 前言 运用人工智能技术&#xff0c;可以使人类社会变得更美好。人们总是期待产品更适合、服务更贴心、生活更便利。在实践中&#xff0c;技术给企业赋能&#xff0c;企业通过优质的产品和服务满足社会&#xff0c;提升人类福祉。很多金融企业已经开始尝试向潜在客户推送…

搜维尔科技:OptiTrack探索人类与技术之间关系的开创性表演

另一种蓝色通过 OptiTrack 释放创造力 总部位于荷兰的当代舞蹈团因其探索人类与技术之间关系的开创性表演而受到广泛赞誉。该公司由富有远见的编舞家大卫米登多普创立&#xff0c;不仅利用技术作为探索的主题&#xff0c;而且将其作为表达故事的动态工具。 “我一直对文化与…

Kubernetes(K8s)的基础概念

K8s的概念 K8S 的全称为 Kubernetes (K12345678S) &#xff08;简化全称&#xff09; Kubernetes 是一个可移植、可扩展的开源平台&#xff0c;用于 管理容器化工作负载和服务&#xff0c;有助于声明式配置和自动化。它拥有庞大且快速发展的生态系统。Kubernetes 服务、支持和…

CQT新里程碑:SOC 2 数据安全认证通过,加强其人工智能支持

Covalent Network&#xff08;CQT&#xff09;发展新里程碑&#xff1a;SOC 2 数据安全认证通过&#xff0c;进一步加强了其人工智能支持 Covalent Network&#xff08;CQT&#xff09;现已完成并通过了严格的 Service Organization Control&#xff08;SOC) 2 Type II 的合规性…

《图解HTTP》笔记1:http的诞生

1&#xff0c;http的诞生&#xff1a; 1.1 为共享知识而生 我们现在使用web&#xff08;World Wide Web的简称&#xff0c;即万维网&#xff09;浏览器&#xff0c;目前可以输入一个网址&#xff08;http://www.baidu.com)&#xff0c;就会有一个网页显示出来。 最开始设想出…

SpringSecurity安全框架

我们使用这个springSecurity安全框架,作用是认证,授权,将用户的权限和对应的资源进行绑定,默认的是在内存中保存的,实际开发中,是需要根据项目业务的需求对某些方法进行重写,使数据库中权限对应的资源进行绑定,就是查看当前登录的用户所扮演的角色,该角色有哪些权限 授权 1内…