Python GStreamer client for minimal-webrtc
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
7.8 KiB

  1. #!/usr/bin/python3
  2. import random
  3. import ssl
  4. import websockets
  5. import asyncio
  6. import os
  7. import sys
  8. import json
  9. import argparse
  10. import time
  11. import gi
  12. gi.require_version('Gst', '1.0')
  13. from gi.repository import Gst
  14. gi.require_version('GstWebRTC', '1.0')
  15. from gi.repository import GstWebRTC
  16. gi.require_version('GstSdp', '1.0')
  17. from gi.repository import GstSdp
  18. PIPELINE_DESC = '''
  19. webrtcbin name=sendrecv
  20. videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
  21. queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
  22. pulsesrc device="alsa_output.pci-0000_00_1f.3.analog-stereo.monitor" ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
  23. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
  24. '''
  25. '''
  26. audiotestsrc wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
  27. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
  28. '''
  29. class WebRTCClient:
  30. def __init__(self, id_, peer_id, server):
  31. self.id_ = id_
  32. self.conn = None
  33. self.pipe = None
  34. self.webrtc = None
  35. self.peer_id = peer_id
  36. self.server = server or 'wss://127.0.0.1:8443'
  37. async def connect(self):
  38. sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
  39. self.conn = await websockets.connect(self.server, ssl=sslctx)
  40. await self.conn.send('HELLO %d' % our_id)
  41. async def setup_call(self):
  42. await self.conn.send('SESSION {}'.format(self.peer_id))
  43. def send_sdp_offer(self, offer):
  44. text = offer.sdp.as_text()
  45. print ('Sending offer:\n%s' % text)
  46. msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
  47. loop = asyncio.new_event_loop()
  48. loop.run_until_complete(self.conn.send(msg))
  49. def on_offer_created(self, promise, _, __):
  50. promise.wait()
  51. reply = promise.get_reply()
  52. offer = reply['offer']
  53. promise = Gst.Promise.new()
  54. self.webrtc.emit('set-local-description', offer, promise)
  55. promise.interrupt()
  56. self.send_sdp_offer(offer)
  57. def on_negotiation_needed(self, element):
  58. promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
  59. element.emit('create-offer', None, promise)
  60. def send_ice_candidate_message(self, _, mlineindex, candidate):
  61. icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
  62. loop = asyncio.new_event_loop()
  63. loop.run_until_complete(self.conn.send(icemsg))
  64. def on_incoming_decodebin_stream(self, _, pad):
  65. print("In on_incoming_decodebin_stream...")
  66. if not pad.has_current_caps():
  67. print (pad, 'has no caps, ignoring')
  68. return
  69. caps = pad.get_current_caps()
  70. assert (len(caps))
  71. s = caps[0]
  72. name = s.get_name()
  73. if name.startswith('video'):
  74. print("Connecting incoming video stream...")
  75. q = Gst.ElementFactory.make('queue')
  76. conv = Gst.ElementFactory.make('videoconvert')
  77. # rate = Gst.ElementFactory.make('videorate')
  78. # scale = Gst.ElementFactory.make('videoscale')
  79. # box = Gst.ElementFactory.make('videobox')
  80. # box.set_property("autocrop", True)
  81. caps = Gst.Caps.from_string("video/x-raw,"
  82. # + "width=640,height=480,"
  83. # + "framerate=30/1,"
  84. + "format=YUY2")
  85. capsfilter = Gst.ElementFactory.make("capsfilter", "filter")
  86. capsfilter.set_property("caps", caps)
  87. # disp = Gst.ElementFactory.make('autovideosink')
  88. sink = Gst.ElementFactory.make('v4l2sink')
  89. sink.set_property('device', '/dev/video1')
  90. self.pipe.add(q, conv, capsfilter, sink)
  91. self.pipe.sync_children_states()
  92. pad.link(q.get_static_pad('sink'))
  93. q.link(conv)
  94. conv.link(capsfilter)
  95. capsfilter.link(sink)
  96. elif name.startswith('audio'):
  97. print("Connecting incoming audio stream...")
  98. q = Gst.ElementFactory.make('queue')
  99. conv = Gst.ElementFactory.make('audioconvert')
  100. resample = Gst.ElementFactory.make('audioresample')
  101. sink = Gst.ElementFactory.make('filesink')
  102. sink.set_property('location', os.environ['HOME']+'/tmp/virtmic')
  103. sink.set_property('sync', 'true')
  104. self.pipe.add(q, conv, resample, sink)
  105. self.pipe.sync_children_states()
  106. pad.link(q.get_static_pad('sink'))
  107. q.link(conv)
  108. conv.link(resample)
  109. resample.link(sink)
  110. def on_incoming_stream(self, _, pad):
  111. if pad.direction != Gst.PadDirection.SRC:
  112. return
  113. decodebin = Gst.ElementFactory.make('decodebin')
  114. decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
  115. self.pipe.add(decodebin)
  116. decodebin.sync_state_with_parent()
  117. self.webrtc.link(decodebin)
  118. def start_pipeline(self):
  119. self.pipe = Gst.parse_launch(PIPELINE_DESC)
  120. self.webrtc = self.pipe.get_by_name('sendrecv')
  121. self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
  122. self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
  123. self.webrtc.connect('pad-added', self.on_incoming_stream)
  124. self.pipe.set_state(Gst.State.PLAYING)
  125. async def handle_sdp(self, message):
  126. assert (self.webrtc)
  127. msg = json.loads(message)
  128. if 'sdp' in msg:
  129. sdp = msg['sdp']
  130. assert(sdp['type'] == 'answer')
  131. sdp = sdp['sdp']
  132. print ('Received answer:\n%s' % sdp)
  133. res, sdpmsg = GstSdp.SDPMessage.new()
  134. GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
  135. answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
  136. promise = Gst.Promise.new()
  137. self.webrtc.emit('set-remote-description', answer, promise)
  138. promise.interrupt()
  139. elif 'ice' in msg:
  140. ice = msg['ice']
  141. candidate = ice['candidate']
  142. sdpmlineindex = ice['sdpMLineIndex']
  143. self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
  144. async def loop(self):
  145. try:
  146. assert self.conn
  147. async for message in self.conn:
  148. if message == 'HELLO':
  149. await self.setup_call()
  150. elif message == 'SESSION_OK':
  151. self.start_pipeline()
  152. elif message.startswith('ERROR'):
  153. print(message)
  154. return 1
  155. else:
  156. await self.handle_sdp(message)
  157. except Exception:
  158. pass
  159. if self.webrtc:
  160. while True:
  161. time.sleep(60)
  162. return 0
  163. def check_plugins():
  164. needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
  165. "rtpmanager", "videotestsrc", "audiotestsrc"]
  166. missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
  167. if len(missing):
  168. print('Missing gstreamer plugins:', missing)
  169. return False
  170. return True
  171. if __name__=='__main__':
  172. Gst.init(None)
  173. if not check_plugins():
  174. sys.exit(1)
  175. parser = argparse.ArgumentParser()
  176. parser.add_argument('peerid', help='String ID of the peer to connect to')
  177. parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
  178. args = parser.parse_args()
  179. our_id = random.randrange(10, 10000)
  180. c = WebRTCClient(our_id, args.peerid, args.server)
  181. asyncio.get_event_loop().run_until_complete(c.connect())
  182. res = asyncio.get_event_loop().run_until_complete(c.loop())
  183. #sys.exit(res)