Kurento
我们通常成他为Kurento media server
,简称为kms
,他是webrtc
下的媒体服务器。kms
像我们上一节使用的那样,我们会对kms
的流程进行简单的介绍。
kms
是基于GStreamer
下进行开发的,有关GStreamer
开发可以找相应的资料。我们还是依照两个依据,一个是sdp
,另外一个是ice
。
sdp的生成
在上一个例子中,我们使用的是kms
中java
的api
,kms
和java的api
也是通过socket进行通讯的。通过JsonRpc
来进行通讯,比如processOffer
是通过反射的方式invoke
调用到目标函数。
voidSdpEndpointImpl::invoke (std::shared_ptr<MediaObjectImpl> obj, const std::string &methodName, const Json::Value ¶ms, Json::Value &response){...if (methodName == "processOffer") {kurento::JsonSerializer s (false);SdpEndpointMethodProcessOffer method;JsonSerializer responseSerializer (true);std::string ret;s.JsonValue = params;method.Serialize (s);ret = method.invoke (std::dynamic_pointer_cast<SdpEndpoint> (obj) );responseSerializer.SerializeNVP (ret);response = responseSerializer.JsonValue["ret"];return;}...SessionEndpointImpl::invoke (obj, methodName, params, response);}
具体看看调用的是哪个函数:
std::string SdpEndpointImpl::processOffer (const std::string &offer){GstSDPMessage *offerSdp = nullptr, *result = nullptr;std::string offerSdpStr;bool expected = false;if (offer.empty () ) {throw KurentoException (SDP_PARSE_ERROR, "Empty offer not valid");}offerSdp = str_to_sdp (offer);if (!pare_exchange_strong (expected, true) ) {//the endpoint is already negotiatedthrow KurentoException (SDP_END_POINT_ALREADY_NEGOTIATED,"Endpoint already negotiated");}g_signal_emit_by_name (element, "process-offer", sessId.c_str (), offerSdp,&result);gst_sdp_message_free (offerSdp);if (result == nullptr) {offerInProcess = false;throw KurentoException (SDP_END_POINT_PROCESS_OFFER_ERROR,"Error processing offer");}sdp_to_str (offerSdpStr, result);gst_sdp_message_free (result);try {MediaSessionStarted event (shared_from_this (),MediaSessionStarted::getName ());sigcSignalEmit(signalMediaSessionStarted, event);} catch (const std::bad_weak_ptr &e) {// shared_from_this()GST_ERROR ("BUG creating %s: %s", MediaSessionStarted::getName ().c_str (),e.what ());}GST_INFO("creating sdp: %s",offerSdpStr.c_str());return offerSdpStr;}
g_signal_emit_by_name (element, "process-offer"
调用了process-offer
的信号,其实是调用到了kms_sdp_session_process_offer
函数。接着调用到底层kms_sdp_agent_generate_answer
。
static GstSDPMessage *kms_sdp_agent_generate_answer (KmsSdpAgent * agent,const GstSDPMessage * offer, GError ** error){....GST_INFO("gst_sdp_message_new");if (gst_sdp_message_new (&answer) != GST_SDP_OK) {g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,"Can not allocate SDP answer");goto end;}if (!kms_sdp_agent_set_default_session_attributes (answer, error)) {goto end;}if (!kms_sdp_agent_set_origin (answer, &o, error)) {goto end;}...return answer;}
gst_sdp_message_new
是GStreamer
的底层生成sdp
函数了。
ice
ice的生成使用的是 地方库libnice
,可以参考这里。
对于kms
来说,他是一种模块化的加载方式,自动会加载class_init
的函数。通过kms_ice_nice_agent_new_candidate_full
后,再到kms_ice_nice_agent_new_candidate_full
的函数。
static char *kms_ice_nice_agent_get_candidate_sdp_string (NiceAgent * agent,NiceCandidate * candidate){//printf("nice_agent_generate_local_candidate_sdp\r\n");gchar *str = nice_agent_generate_local_candidate_sdp (agent, candidate);gchar *cand = g_strconcat (SDP_CANDIDATE_ATTR, ":",(str + SDP_CANDIDATE_ATTR_LEN), NULL);g_free (str);return cand;}
nice_agent_generate_local_candidate_sdp
函数,就是libnice
的底层函数了。
生成的ice,通过socket发走:
sigc::connection conn = signalIceCandidateFound.connect ([ &, wh] (IceCandidateFound event) {//std::cout <<"android pp WebRtcEndpointImpl::connect IceCandidateFound event: "<<"signalIceCandidateFound " << std::endl;std::shared_ptr<EventHandler> lh = wh.lock();if (!lh)return;std::shared_ptr<IceCandidateFound> ev_ref (new IceCandidateFound(event));auto object = this->shared_from_this();lh->sendEventAsync ([ev_ref, object, lh] {JsonSerializer s (true);s.Serialize ("data", ev_ref.get());s.Serialize ("object", object.get());s.JsonValue["type"] = "IceCandidateFound";lh->sendEvent (s.JsonValue);});});
如何连接
创建通道,在这个通道中进行数据的连接。
static voidrtp_ssrc_demux_new_ssrc_pad (GstElement * ssrcdemux, guint ssrc, GstPad * pad,KmsBaseRtpSession * self){GST_INFO("rtp_ssrc_demux_new_ssrc_pad");const gchar *rtp_pad_name = GST_OBJECT_NAME (pad);gchar *rtcp_pad_name;const GstSDPMedia *media;GstPad *src, *sink;GST_INFO_OBJECT (self, "pad: %" GST_PTR_FORMAT " ssrc: %" G_GUINT32_FORMAT,pad, ssrc);KMS_SDP_SESSION_LOCK (self);if (self->remote_audio_ssrc == ssrc|| ssrcs_are_mapped (ssrcdemux, self->local_audio_ssrc, ssrc)) {media = self->audio_neg;} else if (self->remote_video_ssrc == ssrc|| ssrcs_are_mapped (ssrcdemux, self->local_video_ssrc, ssrc)) {media = self->video_neg;} else {if (kms_i_rtp_session_manager_custom_ssrc_management (self->manager, self,ssrcdemux, ssrc, pad)) {goto end;} else {GST_WARNING_OBJECT (pad,"Ignoring media from non-matching SSRC: %" G_GUINT32_FORMAT, ssrc);media = NULL;}}/* RTP */sink = kms_i_rtp_session_manager_request_rtp_sink (self->manager, self, media);kms_base_rtp_session_link_pads (pad, sink);g_object_unref (sink);/* RTCP */rtcp_pad_name = g_strconcat ("rtcp_", rtp_pad_name, NULL);src = gst_element_get_static_pad (ssrcdemux, rtcp_pad_name);g_free (rtcp_pad_name);sink = kms_i_rtp_session_manager_request_rtcp_sink (self->manager, self, media);kms_base_rtp_session_link_pads (src, sink);g_object_unref (src);g_object_unref (sink);end:KMS_SDP_SESSION_UNLOCK (self);}
完成了数据的交换。