#!/usr/bin/python3 import random import ssl import string import websockets import asyncio import os import sys import json import argparse import qrcode import gi gi.require_version('Gst', '1.0') from gi.repository import Gst gi.require_version('GstWebRTC', '1.0') from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp PIPELINE_START = 'webrtcbin name=sendrecv\n' PIPELINE_VIDEO_POSTFIX = ''' ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv. ''' PIPELINE_AUDIO_POSTFIX = ''' ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' class WebRTCClient: def __init__(self, args): self.conn = None self.pipe = None self.webrtc = None self.url = args.url self.has_offer = False self.is_host = True self.args = args if args.roomName is None: # From https://stackoverflow.com/a/2030081 self.roomName = ''.join(random.choice(string.ascii_lowercase) for i in range(6)) else: self.roomName = args.roomName qr = qrcode.QRCode() client_url = '#'.join([self.url, self.roomName]) print(client_url) qr.add_data(client_url) qr.print_ascii(tty=True) self.server = 'ws' + self.url[4:] + 'ws/'\ + ('host' if self.is_host else 'client') + '/'\ + self.roomName + '/' falseStrings = ['false', 'null', 'none', 'no'] testStrings = ['test'] audioPipeline = self.args.sendAudio if audioPipeline.lower() in falseStrings: self.sendAudio = False audioPipeline = 'audiotestsrc wave=silence' elif audioPipeline.lower() in testStrings: self.sendAudio = True audioPipeline = 'audiotestsrc wave=red-noise' else: self.sendAudio = True videoPipeline = self.args.sendVideo if videoPipeline.lower() in falseStrings: self.sendVideo = False videoPipeline = 'videotestsrc pattern=solid-color' elif videoPipeline.lower() in testStrings: self.sendVideo = True videoPipeline = 'videotestsrc pattern=ball' enableAudio = self.sendAudio or self.args.receiveAudio enableVideo = self.sendVideo or self.args.receiveVideo != 'false' if not (enableAudio or enableVideo): print('Must enable audio or video.') sys.exit() self.pipeline = PIPELINE_START if enableAudio: self.pipeline += audioPipeline + PIPELINE_AUDIO_POSTFIX if enableVideo: self.pipeline += videoPipeline + PIPELINE_VIDEO_POSTFIX async def connect(self): sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) self.conn = await websockets.connect(self.server, ssl=sslctx) if not self.is_host: await self.conn.send('{"ready": "separateIce"}') self.start_pipeline() def send_sdp_offer(self, offer): if not self.is_host and not self.has_offer: pass text = offer.sdp.as_text() print('Sending offer:\n%s' % text) msg = json.dumps({'description': {'type': 'offer', 'sdp': text}}) loop = asyncio.new_event_loop() loop.run_until_complete(self.conn.send(msg)) def on_offer_created(self, promise, _, __): print('In on_offer_created...') promise.wait() reply = promise.get_reply() offer = reply.get_value('offer') promise = Gst.Promise.new() self.webrtc.emit('set-local-description', offer, promise) promise.interrupt() self.send_sdp_offer(offer) def on_negotiation_needed(self, element): print('In on_negotiation_needed...') promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) element.emit('create-offer', None, promise) def send_ice_candidate_message(self, _, mlineindex, candidate): if not self.is_host and not self.has_offer: pass icemsg = json.dumps({'candidate': candidate, 'sdpMLineIndex': mlineindex}) loop = asyncio.new_event_loop() loop.run_until_complete(self.conn.send(icemsg)) def on_incoming_decodebin_stream(self, _, pad): print('In on_incoming_decodebin_stream...') if not pad.has_current_caps(): print(pad, 'has no caps, ignoring') return caps = pad.get_current_caps() assert caps.get_size() s = caps.get_structure(0) name = s.get_name() if name.startswith('video'): print("Connecting incoming video stream...") q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('videoconvert') if self.args.receiveVideoTo == 'auto': print('Displaying video to screen using autovideosink.') sink = Gst.ElementFactory.make('autovideosink') self.pipe.add(q) self.pipe.add(conv) self.pipe.add(sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(sink) else: print('Sending video to v4l2 device %s.' % self.args.receiveVideoTo) caps = Gst.Caps.from_string("video/x-raw,format=YUY2") capsfilter = Gst.ElementFactory.make("capsfilter", "vfilter") capsfilter.set_property("caps", caps) sink = Gst.ElementFactory.make('v4l2sink') sink.set_property('device', self.args.receiveVideoTo) self.pipe.add(q) self.pipe.add(conv) self.pipe.add(capsfilter) self.pipe.add(sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(capsfilter) capsfilter.link(sink) elif name.startswith('audio'): print("Connecting incoming audio stream...") q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('audioconvert') resample = Gst.ElementFactory.make('audioresample') if self.args.receiveAudioTo == 'auto': print('Playing audio using autoaudiosink.') sink = Gst.ElementFactory.make('autoaudiosink') self.pipe.add(q, conv, resample, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(sink) elif self.args.receiveAudioTo.startswith('device='): device = self.args.receiveAudioTo[len('device='):] print('Playing audio using pulseaudio device %s.' % device) sink = Gst.ElementFactory.make('pulsesink') sink.set_property('device', device) self.pipe.add(q, conv, resample, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(sink) else: print('Sending audio to file %s.' % self.args.receiveAudioTo) caps = Gst.Caps.from_string( "audio/x-raw,format=S16LE,channels=1") capsfilter = Gst.ElementFactory.make("capsfilter", "afilter") capsfilter.set_property("caps", caps) sink = Gst.ElementFactory.make('filesink') sink.set_property('location', self.args.receiveAudioTo) sink.set_property('sync', 'true') self.pipe.add(q) self.pipe.add(conv) self.pipe.add(resample) self.pipe.add(capsfilter) self.pipe.add(sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(capsfilter) capsfilter.link(sink) def on_incoming_stream(self, _, pad): print('In on_incoming_stream...') if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make('decodebin') decodebin.connect('pad-added', self.on_incoming_decodebin_stream) self.pipe.add(decodebin) decodebin.sync_state_with_parent() self.webrtc.link(decodebin) def on_data_channel_open(self): print('In on_data_channel_open...') def on_data_channel_message(self, msg): print('In on_data_channel_message...') print('Data channel message: %s' % msg) def on_data_channel(self, channel): print('In on_data_channel...') self.data_channel = channel channel.connect('on-open', self.on_data_channel_open) channel.connect('on-message-string', self.on_data_channel_message) def start_pipeline(self): print('In start_pipeline...') self.pipe = Gst.parse_launch(self.pipeline) self.webrtc = self.pipe.get_by_name('sendrecv') self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) self.webrtc.connect('on-data-channel', self.on_data_channel) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) async def handle_sdp(self, msg): if not self.webrtc: self.start_pipeline() assert (self.webrtc) if 'description' in msg: print('connection-state=%s' % self.webrtc.get_property('connection-state')) self.has_offer = True sdp = msg['description'] assert(sdp['type'] == 'answer') sdp = sdp['sdp'] print('Received answer:\n%s' % (sdp)) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new( GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() self.webrtc.emit('set-remote-description', answer, promise) promise.interrupt() elif 'candidate' in msg: candidate = msg['candidate'] sdpmlineindex = msg['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) async def loop(self): assert self.conn async for message in self.conn: msg = json.loads(message) if 'ready' in msg: self.start_pipeline() await self.conn.send(json.dumps({'settings': { 'separateIce': True, 'serverless': False, 'client-video': 'none' if self.args.receiveVideo == 'false' else self.args.receiveVideo, 'client-audio': self.args.receiveAudio, 'host-video': self.sendVideo, 'host-audio': self.sendAudio, 'debug': True, }})) else: await self.handle_sdp(msg) return 0 def check_plugins(): needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", "rtpmanager", "videotestsrc", "audiotestsrc"] missing = list(filter( lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) if len(missing): print('Missing gstreamer plugins:', missing) return False return True if __name__ == '__main__': Gst.init(None) if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--url', help='URL from minimal-webrtc', default='https://localhost/camera/') parser.add_argument('--roomName', help='room name to host') parser.add_argument('--sendAudio', default='test', help='GStreamer audio pipeline to send') parser.add_argument('--sendVideo', default='test', help='GStreamer video pipeline to send') parser.add_argument('--receiveAudio', action='store_true', default=None, help='Enable receiving audio') parser.add_argument('--receiveVideo', default=None, help='Set video to receive ("screen", ' + '"environment", "facing", "true", "false")') parser.add_argument('--receiveAudioTo', default=None, help='"auto" or file path or device=DEVICE ' + 'where DEVICE is a PulseAudio sink ' + 'to send received audio to ') parser.add_argument('--receiveVideoTo', default=None, help='"auto" or file path to send received video to') args = parser.parse_args() # Support only one of receiveAudio/receiveAudioTo or # receiveVideo/receiveVideoTo while setting reasonable defaults. if args.receiveAudio is not None and args.receiveAudioTo is not None: pass elif args.receiveAudio is None and args.receiveAudioTo is None: args.receiveAudio = False elif args.receiveAudio is None: args.receiveAudio = True elif args.receiveAudioTo is None: args.receiveAudioTo = 'auto' if args.receiveVideo is not None and args.receiveVideoTo is not None: pass elif args.receiveVideo is None and args.receiveVideoTo is None: args.receiveVideo = False elif args.receiveVideo is None: args.receiveVideo = True elif args.receiveVideoTo is None: args.receiveVideoTo = 'auto' c = WebRTCClient(args) asyncio.get_event_loop().run_until_complete(c.connect()) res = asyncio.get_event_loop().run_until_complete(c.loop()) sys.exit(res)