Python GStreamer client for minimal-webrtc
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

364 lines
14 KiB

  1. #!/usr/bin/python3
  2. import random
  3. import ssl
  4. import string
  5. import websockets
  6. import asyncio
  7. import os
  8. import sys
  9. import json
  10. import argparse
  11. import qrcode
  12. import gi
  13. gi.require_version('Gst', '1.0')
  14. from gi.repository import Gst
  15. gi.require_version('GstWebRTC', '1.0')
  16. from gi.repository import GstWebRTC
  17. gi.require_version('GstSdp', '1.0')
  18. from gi.repository import GstSdp
  19. PIPELINE_START = 'webrtcbin name=sendrecv\n'
  20. PIPELINE_VIDEO_POSTFIX = ''' ! videoconvert ! queue !
  21. vp8enc deadline=1 ! rtpvp8pay !
  22. queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 !
  23. sendrecv.
  24. '''
  25. PIPELINE_AUDIO_POSTFIX = ''' ! audioconvert ! audioresample ! queue !
  26. opusenc ! rtpopuspay !
  27. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 !
  28. sendrecv.
  29. '''
  30. class WebRTCClient:
  31. def __init__(self, args):
  32. self.conn = None
  33. self.pipe = None
  34. self.webrtc = None
  35. self.url = args.url
  36. self.has_offer = False
  37. self.is_host = True
  38. self.args = args
  39. if args.roomName is None:
  40. # From https://stackoverflow.com/a/2030081
  41. self.roomName = ''.join(random.choice(string.ascii_lowercase)
  42. for i in range(6))
  43. else:
  44. self.roomName = args.roomName
  45. qr = qrcode.QRCode()
  46. client_url = '#'.join([self.url, self.roomName])
  47. print(client_url)
  48. qr.add_data(client_url)
  49. qr.print_ascii(tty=True)
  50. self.server = 'ws' + self.url[4:] + 'ws/'\
  51. + ('host' if self.is_host else 'client') + '/'\
  52. + self.roomName + '/'
  53. falseStrings = ['false', 'null', 'none', 'no']
  54. testStrings = ['test']
  55. audioPipeline = self.args.sendAudio
  56. if audioPipeline.lower() in falseStrings:
  57. self.sendAudio = False
  58. audioPipeline = 'audiotestsrc wave=silence'
  59. elif audioPipeline.lower() in testStrings:
  60. self.sendAudio = True
  61. audioPipeline = 'audiotestsrc wave=red-noise'
  62. else:
  63. self.sendAudio = True
  64. videoPipeline = self.args.sendVideo
  65. if videoPipeline.lower() in falseStrings:
  66. self.sendVideo = False
  67. videoPipeline = 'videotestsrc pattern=solid-color'
  68. elif videoPipeline.lower() in testStrings:
  69. self.sendVideo = True
  70. videoPipeline = 'videotestsrc pattern=ball'
  71. enableAudio = self.sendAudio or self.args.receiveAudio
  72. enableVideo = self.sendVideo or self.args.receiveVideo != 'false'
  73. if not (enableAudio or enableVideo):
  74. print('Must enable audio or video.')
  75. sys.exit()
  76. self.pipeline = PIPELINE_START
  77. if enableAudio:
  78. self.pipeline += audioPipeline + PIPELINE_AUDIO_POSTFIX
  79. if enableVideo:
  80. self.pipeline += videoPipeline + PIPELINE_VIDEO_POSTFIX
  81. async def connect(self):
  82. sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
  83. self.conn = await websockets.connect(self.server, ssl=sslctx)
  84. if not self.is_host:
  85. await self.conn.send('{"ready": "separateIce"}')
  86. self.start_pipeline()
  87. def send_sdp_offer(self, offer):
  88. if not self.is_host and not self.has_offer:
  89. pass
  90. text = offer.sdp.as_text()
  91. print('Sending offer:\n%s' % text)
  92. msg = json.dumps({'description': {'type': 'offer', 'sdp': text}})
  93. loop = asyncio.new_event_loop()
  94. loop.run_until_complete(self.conn.send(msg))
  95. def on_offer_created(self, promise, _, __):
  96. print('In on_offer_created...')
  97. promise.wait()
  98. reply = promise.get_reply()
  99. offer = reply.get_value('offer')
  100. promise = Gst.Promise.new()
  101. self.webrtc.emit('set-local-description', offer, promise)
  102. promise.interrupt()
  103. self.send_sdp_offer(offer)
  104. def on_negotiation_needed(self, element):
  105. print('In on_negotiation_needed...')
  106. promise = Gst.Promise.new_with_change_func(self.on_offer_created,
  107. element, None)
  108. element.emit('create-offer', None, promise)
  109. def send_ice_candidate_message(self, _, mlineindex, candidate):
  110. if not self.is_host and not self.has_offer:
  111. pass
  112. icemsg = json.dumps({'candidate': candidate,
  113. 'sdpMLineIndex': mlineindex})
  114. loop = asyncio.new_event_loop()
  115. loop.run_until_complete(self.conn.send(icemsg))
  116. def on_incoming_decodebin_stream(self, _, pad):
  117. print('In on_incoming_decodebin_stream...')
  118. if not pad.has_current_caps():
  119. print(pad, 'has no caps, ignoring')
  120. return
  121. caps = pad.get_current_caps()
  122. assert caps.get_size()
  123. s = caps.get_structure(0)
  124. name = s.get_name()
  125. if name.startswith('video'):
  126. print("Connecting incoming video stream...")
  127. q = Gst.ElementFactory.make('queue')
  128. conv = Gst.ElementFactory.make('videoconvert')
  129. if self.args.receiveVideoTo == 'auto':
  130. print('Displaying video to screen using autovideosink.')
  131. sink = Gst.ElementFactory.make('autovideosink')
  132. self.pipe.add(q)
  133. self.pipe.add(conv)
  134. self.pipe.add(sink)
  135. self.pipe.sync_children_states()
  136. pad.link(q.get_static_pad('sink'))
  137. q.link(conv)
  138. conv.link(sink)
  139. else:
  140. print('Sending video to v4l2 device %s.'
  141. % self.args.receiveVideoTo)
  142. caps = Gst.Caps.from_string("video/x-raw,format=YUY2")
  143. capsfilter = Gst.ElementFactory.make("capsfilter", "vfilter")
  144. capsfilter.set_property("caps", caps)
  145. sink = Gst.ElementFactory.make('v4l2sink')
  146. sink.set_property('device', self.args.receiveVideoTo)
  147. self.pipe.add(q)
  148. self.pipe.add(conv)
  149. self.pipe.add(capsfilter)
  150. self.pipe.add(sink)
  151. self.pipe.sync_children_states()
  152. pad.link(q.get_static_pad('sink'))
  153. q.link(conv)
  154. conv.link(capsfilter)
  155. capsfilter.link(sink)
  156. elif name.startswith('audio'):
  157. print("Connecting incoming audio stream...")
  158. q = Gst.ElementFactory.make('queue')
  159. conv = Gst.ElementFactory.make('audioconvert')
  160. resample = Gst.ElementFactory.make('audioresample')
  161. if self.args.receiveAudioTo == 'auto':
  162. print('Playing audio using autoaudiosink.')
  163. sink = Gst.ElementFactory.make('autoaudiosink')
  164. self.pipe.add(q, conv, resample, sink)
  165. self.pipe.sync_children_states()
  166. pad.link(q.get_static_pad('sink'))
  167. q.link(conv)
  168. conv.link(resample)
  169. resample.link(sink)
  170. elif self.args.receiveAudioTo.startswith('device='):
  171. device = self.args.receiveAudioTo[len('device='):]
  172. print('Playing audio using pulseaudio device %s.' % device)
  173. sink = Gst.ElementFactory.make('pulsesink')
  174. sink.set_property('device', device)
  175. self.pipe.add(q, conv, resample, sink)
  176. self.pipe.sync_children_states()
  177. pad.link(q.get_static_pad('sink'))
  178. q.link(conv)
  179. conv.link(resample)
  180. resample.link(sink)
  181. else:
  182. print('Sending audio to file %s.' % self.args.receiveAudioTo)
  183. caps = Gst.Caps.from_string(
  184. "audio/x-raw,format=S16LE,channels=1")
  185. capsfilter = Gst.ElementFactory.make("capsfilter", "afilter")
  186. capsfilter.set_property("caps", caps)
  187. sink = Gst.ElementFactory.make('filesink')
  188. sink.set_property('location', self.args.receiveAudioTo)
  189. sink.set_property('sync', 'true')
  190. self.pipe.add(q)
  191. self.pipe.add(conv)
  192. self.pipe.add(resample)
  193. self.pipe.add(capsfilter)
  194. self.pipe.add(sink)
  195. self.pipe.sync_children_states()
  196. pad.link(q.get_static_pad('sink'))
  197. q.link(conv)
  198. conv.link(resample)
  199. resample.link(capsfilter)
  200. capsfilter.link(sink)
  201. def on_incoming_stream(self, _, pad):
  202. print('In on_incoming_stream...')
  203. if pad.direction != Gst.PadDirection.SRC:
  204. return
  205. decodebin = Gst.ElementFactory.make('decodebin')
  206. decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
  207. self.pipe.add(decodebin)
  208. decodebin.sync_state_with_parent()
  209. self.webrtc.link(decodebin)
  210. def on_data_channel_open(self):
  211. print('In on_data_channel_open...')
  212. def on_data_channel_message(self, msg):
  213. print('In on_data_channel_message...')
  214. print('Data channel message: %s' % msg)
  215. def on_data_channel(self, channel):
  216. print('In on_data_channel...')
  217. self.data_channel = channel
  218. channel.connect('on-open', self.on_data_channel_open)
  219. channel.connect('on-message-string', self.on_data_channel_message)
  220. def start_pipeline(self):
  221. print('In start_pipeline...')
  222. self.pipe = Gst.parse_launch(self.pipeline)
  223. self.webrtc = self.pipe.get_by_name('sendrecv')
  224. self.webrtc.connect('on-negotiation-needed',
  225. self.on_negotiation_needed)
  226. self.webrtc.connect('on-ice-candidate',
  227. self.send_ice_candidate_message)
  228. self.webrtc.connect('on-data-channel',
  229. self.on_data_channel)
  230. self.webrtc.connect('pad-added', self.on_incoming_stream)
  231. self.pipe.set_state(Gst.State.PLAYING)
  232. async def handle_sdp(self, msg):
  233. if not self.webrtc:
  234. self.start_pipeline()
  235. assert (self.webrtc)
  236. if 'description' in msg:
  237. print('connection-state=%s'
  238. % self.webrtc.get_property('connection-state'))
  239. self.has_offer = True
  240. sdp = msg['description']
  241. assert(sdp['type'] == 'answer')
  242. sdp = sdp['sdp']
  243. print('Received answer:\n%s' % (sdp))
  244. res, sdpmsg = GstSdp.SDPMessage.new()
  245. GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
  246. answer = GstWebRTC.WebRTCSessionDescription.new(
  247. GstWebRTC.WebRTCSDPType.ANSWER,
  248. sdpmsg)
  249. promise = Gst.Promise.new()
  250. self.webrtc.emit('set-remote-description', answer, promise)
  251. promise.interrupt()
  252. elif 'candidate' in msg:
  253. candidate = msg['candidate']
  254. sdpmlineindex = msg['sdpMLineIndex']
  255. self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
  256. async def loop(self):
  257. assert self.conn
  258. async for message in self.conn:
  259. msg = json.loads(message)
  260. if 'ready' in msg:
  261. self.start_pipeline()
  262. await self.conn.send(json.dumps({'settings': {
  263. 'separateIce': True,
  264. 'serverless': False,
  265. 'client-video': 'none' if self.args.receiveVideo == 'false' else self.args.receiveVideo,
  266. 'client-audio': self.args.receiveAudio,
  267. 'host-video': self.sendVideo,
  268. 'host-audio': self.sendAudio,
  269. 'debug': True,
  270. }}))
  271. else:
  272. await self.handle_sdp(msg)
  273. return 0
  274. def check_plugins():
  275. needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
  276. "rtpmanager", "videotestsrc", "audiotestsrc"]
  277. missing = list(filter(
  278. lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
  279. if len(missing):
  280. print('Missing gstreamer plugins:', missing)
  281. return False
  282. return True
  283. if __name__ == '__main__':
  284. Gst.init(None)
  285. if not check_plugins():
  286. sys.exit(1)
  287. parser = argparse.ArgumentParser()
  288. parser.add_argument('--url', help='URL from minimal-webrtc',
  289. default='https://localhost/camera/')
  290. parser.add_argument('--roomName', help='room name to host')
  291. parser.add_argument('--sendAudio', default='test',
  292. help='GStreamer audio pipeline to send')
  293. parser.add_argument('--sendVideo', default='test',
  294. help='GStreamer video pipeline to send')
  295. parser.add_argument('--receiveAudio', action='store_true', default=None,
  296. help='Enable receiving audio')
  297. parser.add_argument('--receiveVideo', default=None,
  298. help='Set video to receive ("screen", '
  299. + '"environment", "facing", "true", "false")')
  300. parser.add_argument('--receiveAudioTo', default=None,
  301. help='"auto" or file path or device=DEVICE '
  302. + 'where DEVICE is a PulseAudio sink '
  303. + 'to send received audio to ')
  304. parser.add_argument('--receiveVideoTo', default=None,
  305. help='"auto" or file path to send received video to')
  306. args = parser.parse_args()
  307. # Support only one of receiveAudio/receiveAudioTo or
  308. # receiveVideo/receiveVideoTo while setting reasonable defaults.
  309. if args.receiveAudio is not None and args.receiveAudioTo is not None:
  310. pass
  311. elif args.receiveAudio is None and args.receiveAudioTo is None:
  312. args.receiveAudio = False
  313. elif args.receiveAudio is None:
  314. args.receiveAudio = True
  315. elif args.receiveAudioTo is None:
  316. args.receiveAudioTo = 'auto'
  317. if args.receiveVideo is not None and args.receiveVideoTo is not None:
  318. pass
  319. elif args.receiveVideo is None and args.receiveVideoTo is None:
  320. args.receiveVideo = False
  321. elif args.receiveVideo is None:
  322. args.receiveVideo = True
  323. elif args.receiveVideoTo is None:
  324. args.receiveVideoTo = 'auto'
  325. c = WebRTCClient(args)
  326. asyncio.get_event_loop().run_until_complete(c.connect())
  327. res = asyncio.get_event_loop().run_until_complete(c.loop())
  328. sys.exit(res)