diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..43357a6 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,38 @@ +sudo: required +dist: xenial + +language: python + +addons: + apt: + packages: + - gstreamer1.0-plugins-base + - gstreamer1.0-plugins-good + - gir1.2-gstreamer-1.0 + - python-gst-1.0 + - python3-gst-1.0 + - python-redis + - python3-redis + - python-gobject + - python-gi + +services: + - redis-server + +python: + - 2.7 + - 3.5 + +virtualenv: + system_site_packages: true # Required as Travis runs python in a virtualenv, thus denying access to system GLib + +install: + - python setup.py install + +script: + - (openob 127.0.0.1 -v travis_1 travis tx 127.0.0.1 -a test 2>&1 | tee tx_log) & + - (tail -f -n0 tx_log &) | grep -q "Started mono audio transmission" + - (openob 127.0.0.1 -v travis_2 travis rx -a test 2>&1 | tee rx_log) & + - (tail -f -n0 rx_log &) | grep -q "Receiving mono audio transmission" + - sleep 10 + - pkill openob \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 243fb7d..de7b88c 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 4.0.0-dev + +* Upgraded GStreamer libraries to ^1.0 +* Added Python 3 compatability + ## 3.1 * Improved command line interface (Jonty Sewell) diff --git a/bin/openob b/bin/openob index ac61a3a..64ee4c2 100755 --- a/bin/openob +++ b/bin/openob @@ -1,15 +1,13 @@ -#!/usr/bin/python +#!/usr/bin/env python + import sys import argparse import logging -# Thanks gst for messing with argv -argv = sys.argv -sys.argv = [] + from openob.logger import LoggerFactory from openob.node import Node from openob.link_config import LinkConfig from openob.audio_interface import AudioInterface -sys.argv = argv parser = argparse.ArgumentParser(prog='openob', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-v', '--verbose', action='store_const', help='Increase logging verbosity', const=logging.DEBUG, default=logging.INFO) @@ -21,7 +19,7 @@ subparsers = parser.add_subparsers(help="The link mode to operate in on this end parser_tx = subparsers.add_parser('tx', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser_tx.add_argument('receiver_host', type=str, help="The receiver for this transmitter. The machine at this address must be running an rx-mode Manager for this link name") -parser_tx.add_argument('-a', '--audio_input', type=str, choices=['auto', 'alsa', 'jack'], default='auto', help="The audio source type for this end of the link") +parser_tx.add_argument('-a', '--audio_input', type=str, choices=['auto', 'alsa', 'jack', 'test'], default='auto', help="The audio source type for this end of the link") parser_tx_alsa = parser_tx.add_argument_group('alsa', 'Options when using ALSA source type') parser_tx_alsa.add_argument('-d', '--alsa_device', type=str, default='hw:0', help="The ALSA device to connect to for input") parser_tx_jack = parser_tx.add_argument_group('jack', 'Options when using JACK source type') @@ -45,7 +43,7 @@ parser_tx_opus.add_argument('--framesize', type=int, default=20, help="Opus fram parser_tx.set_defaults(mode='tx', fec=True, dtx=False, multicast=False) parser_rx = subparsers.add_parser('rx', formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser_rx.add_argument('-a', '--audio_output', type=str, choices=['auto', 'alsa', 'jack'], default='auto', help="The audio output type for this end of the link") +parser_rx.add_argument('-a', '--audio_output', type=str, choices=['auto', 'alsa', 'jack', 'test'], default='auto', help="The audio output type for this end of the link") parser_rx_alsa = parser_rx.add_argument_group('alsa', 'Options when using ALSA output type') parser_rx_alsa.add_argument('-d', '--alsa_device', type=str, default='hw:0', help="The ALSA device to connect to for input") parser_rx_jack = parser_rx.add_argument_group('jack', 'Options when using JACK output type') diff --git a/doc/source/conf.py b/doc/source/conf.py index eb9bec8..c74f39a 100755 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -48,9 +48,9 @@ # built documents. # # The short X.Y version. -version = '3.0' +version = '4.0' # The full version, including alpha/beta/rc tags. -release = '3.0 alpha2' +release = '4.0.0-dev' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/doc/source/index.rst b/doc/source/index.rst index 63f13b8..9fd03e6 100755 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -14,10 +14,6 @@ OpenOB can be used for: It can be used on a variety of network connections (including over the internet and mobile links such as 3G), with operating bitrates as low as 16kbps in compressed mode, and support for fully lossless operation in linear PCM mode. -.. WARNING:: - This is the documentation for the currently-unreleased/unstable OpenOB 3 refactoring. If you are using the 2.3 version from PyPi some documentation may be incorrect or misleading. - - Documentation Index =================== diff --git a/doc/source/intro.rst b/doc/source/intro.rst index 4314e84..2ee4837 100755 --- a/doc/source/intro.rst +++ b/doc/source/intro.rst @@ -10,7 +10,7 @@ Architecture OpenOB is a peer to peer audio streaming system with a central configuration server. -The program itself is a set of Python classes wrapping the PyGST bindings for the GStreamer media framework, which itself performs the audio encoding/decoding and transmission. +The program itself is a set of Python classes wrapping the Python GObject bindings for the GStreamer media framework, which itself performs the audio encoding/decoding and transmission. An OpenOB *link* is comprised of a receiver and transmitter pair. @@ -51,7 +51,7 @@ The following is a recommended set of specifications that are known to run OpenO - Dual-core Intel Atom, i3 or better @ 1.2GHz or better - 512MB RAM (2GB if you want a desktop environment) - 100Mbps NIC -- Debian Wheezy (7.0) +- Debian Jessie (8.0) OpenOB has been known to run on systems with significantly lower specifications. diff --git a/doc/source/tutorial.rst b/doc/source/tutorial.rst index 0d28175..9efcfcc 100755 --- a/doc/source/tutorial.rst +++ b/doc/source/tutorial.rst @@ -17,25 +17,27 @@ OpenOB relies on the GStreamer media framework for the underlying audio transpor Additionally, OpenOB needs some Python extensions, and on the configuration server, we must also install the Redis server used for configuration management. -On Debian you can install the prerequisites with the following command: +On Debian Stretch / Ubuntu Xenial you can install the prerequisites with the following command: .. code-block:: bash - sudo apt-get install python-gst0.10 python-setuptools gstreamer0.10-plugins-base gstreamer0.10-plugins-bad gstreamer0.10-plugins-good gstreamer0.10-plugins-ugly gstreamer0.10-tools python-gobject python-gobject-2 gstreamer0.10-alsa python-argparse python-redis + sudo apt install gstreamer1.0-plugins-base gstreamer1.0-plugins-good gir1.2-gstreamer-1.0 python-gst-1.0 python-redis python-gi python-setuptools -This should also work on Ubuntu. Your GStreamer implementation must be recent enough to support Opus; this is supported in Ubuntu 13.04 and Debian Wheezy or newer. In order to ensure compatibility, it is recommended that both ends of the link use the same version of GStreamer, which is most easily achieved by running the same operating system version on each end and installing the distribution's packages as detailed above. +If you wish to use Python 3, you must install `python3-redis`, `python3-gst-1.0` and `python3-setuptools` instead of the Python 2 equivalents. +The GStreamer Opus plugin has graduated from the 'bad' plugins repository to the 'base' repository as of 2015. Older distributions may require the `gstreamer1.0-plugins-bad` package installed. +In order to ensure compatibility, it is recommended that both ends of the link use the same version of GStreamer, which is most easily achieved by running the same operating system version on each end and installing the distribution's packages as detailed above. On one machine, which for this tutorial we'll assume is also our receiver, we'll install Redis: .. code-block:: bash - [user@rx-host] $ sudo apt-get install redis-server + [user@rx-host] $ sudo apt install redis-server We also need to make sure Redis binds itself to be accessible to remote machines, not just localhost. You can edit ``/etc/redis/redis.conf`` yourself or run the following to instantly make this adjustment .. code-block:: bash - [user@rx-host] $ sudo sed -i.bak ‘s/bind 127.*/bind 0.0.0.0/’ /etc/redis/redis.conf && sudo service redis restart + [user@rx-host] $ sudo sed -i.bak ‘s/bind 127.*/bind 0.0.0.0/’ /etc/redis/redis.conf && sudo service redis-server restart Installing OpenOB ----------------- @@ -44,7 +46,7 @@ Now we can install OpenOB itself. You can install from git for the bleeding edge .. code-block:: bash - sudo easy_install OpenOB + sudo pip install OpenOB Networking ---------- diff --git a/openob/link_config.py b/openob/link_config.py index a392cf6..8c221a8 100755 --- a/openob/link_config.py +++ b/openob/link_config.py @@ -27,7 +27,7 @@ def __init__(self, link_name, redis_host): self.redis = None while True: try: - self.redis = redis.StrictRedis(self.redis_host) + self.redis = redis.StrictRedis(host=self.redis_host, charset="utf-8", decode_responses=True) break except Exception as e: self.logger.error( diff --git a/openob/node.py b/openob/node.py index d98d542..1f158a2 100755 --- a/openob/node.py +++ b/openob/node.py @@ -4,8 +4,6 @@ from openob.rtp.tx import RTPTransmitter from openob.rtp.rx import RTPReceiver from openob.link_config import LinkConfig -from gst import ElementNotFoundError - class Node(object): @@ -50,9 +48,6 @@ def run_link(self, link_config, audio_interface): link_logger.debug("Got caps from transmitter, setting config") link_config.set("caps", caps) transmitter.loop() - except ElementNotFoundError as e: - link_logger.critical("GStreamer element missing: %s - will now exit" % e) - sys.exit(1) except Exception as e: link_logger.exception("Transmitter crashed for some reason! Restarting...") time.sleep(0.5) @@ -65,9 +60,6 @@ def run_link(self, link_config, audio_interface): receiver = RTPReceiver(self.node_name, link_config, audio_interface) receiver.run() receiver.loop() - except ElementNotFoundError as e: - link_logger.critical("GStreamer element missing: %s - will now exit" % e) - sys.exit(1) except Exception as e: link_logger.exception("Receiver crashed for some reason! Restarting...") time.sleep(0.1) diff --git a/openob/rtp/rx.py b/openob/rtp/rx.py index 3eb703c..3771533 100755 --- a/openob/rtp/rx.py +++ b/openob/rtp/rx.py @@ -1,137 +1,201 @@ -import gobject -import pygst -pygst.require("0.10") -import gst -import re -from openob.logger import LoggerFactory +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib +Gst.init(None) +from openob.logger import LoggerFactory class RTPReceiver(object): def __init__(self, node_name, link_config, audio_interface): """Sets up a new RTP receiver""" - self.started = False - self.pipeline = gst.Pipeline("rx") - self.bus = self.pipeline.get_bus() - self.bus.connect("message", self.on_message) + self.link_config = link_config self.audio_interface = audio_interface + self.logger_factory = LoggerFactory() self.logger = self.logger_factory.getLogger('node.%s.link.%s.%s' % (node_name, self.link_config.name, self.audio_interface.mode)) - self.logger.info('Creating RTP reception pipeline') - caps = self.link_config.get("caps") + self.logger.info('Creating reception pipeline') + + self.build_pipeline() + + def run(self): + self.pipeline.set_state(Gst.State.PLAYING) + self.logger.info('Listening for stream on %s:%i' % (self.link_config.receiver_host, self.link_config.port)) + + def loop(self): + try: + self.main_loop = GLib.MainLoop() + self.main_loop.run() + except Exception as e: + self.logger.exception('Encountered a problem in the MainLoop, tearing down the pipeline: %s' % e) + self.pipeline.set_state(Gst.State.NULL) + + def build_pipeline(self): + self.pipeline = Gst.Pipeline.new('rx') + + self.started = False + bus = self.pipeline.get_bus() + + self.transport = self.build_transport() + self.decoder = self.build_decoder() + self.output = self.build_audio_interface() + + self.pipeline.add(self.transport) + self.pipeline.add(self.decoder) + self.pipeline.add(self.output) + self.transport.link(self.decoder) + self.decoder.link(self.output) + + bus.add_signal_watch() + bus.connect('message', self.on_message) + + def build_audio_interface(self): + self.logger.debug('Building audio output bin') + bin = Gst.Bin.new('audio') + # Audio output if self.audio_interface.type == 'auto': - self.sink = gst.element_factory_make("autoaudiosink") + sink = Gst.ElementFactory.make('autoaudiosink') elif self.audio_interface.type == 'alsa': - self.sink = gst.element_factory_make("alsasink") - self.sink.set_property('device', self.audio_interface.alsa_device) + sink = Gst.ElementFactory.make('alsasink') + sink.set_property('device', self.audio_interface.alsa_device) elif self.audio_interface.type == 'jack': - self.sink = gst.element_factory_make("jackaudiosink") + sink = Gst.ElementFactory.make('jackaudiosink') if self.audio_interface.jack_auto: - self.sink.set_property('connect', 'auto') + sink.set_property('connect', 'auto') else: - self.sink.set_property('connect', 'none') - self.sink.set_property('name', self.audio_interface.jack_name) - self.sink.set_property('client-name', self.audio_interface.jack_name) + sink.set_property('connect', 'none') + sink.set_property('name', self.audio_interface.jack_name) + sink.set_property('client-name', self.audio_interface.jack_name) + elif self.audio_interface.type == 'test': + sink = Gst.ElementFactory.make('fakesink') + bin.add(sink) + # Audio resampling and conversion - self.audioresample = gst.element_factory_make("audioresample") - self.audioconvert = gst.element_factory_make("audioconvert") - self.audioresample.set_property('quality', 6) + resample = Gst.ElementFactory.make('audioresample') + resample.set_property('quality', 9) + bin.add(resample) + + convert = Gst.ElementFactory.make('audioconvert') + bin.add(convert) + + # Our level monitor, also used for continuous audio + level = Gst.ElementFactory.make('level') + level.set_property('message', True) + level.set_property('interval', 1000000000) + bin.add(level) + + resample.link(convert) + convert.link(level) + level.link(sink) + + bin.add_pad(Gst.GhostPad.new('sink', resample.get_static_pad('sink'))) + + return bin + + def build_decoder(self): + self.logger.debug('Building decoder bin') + bin = Gst.Bin.new('decoder') # Decoding and depayloading if self.link_config.encoding == 'opus': - self.decoder = gst.element_factory_make("opusdec", "decoder") - self.decoder.set_property('use-inband-fec', True) # FEC - self.decoder.set_property('plc', True) # Packet loss concealment - self.depayloader = gst.element_factory_make( - "rtpopusdepay", "depayloader") + decoder = Gst.ElementFactory.make('opusdec', 'decoder') + decoder.set_property('use-inband-fec', True) # FEC + decoder.set_property('plc', True) # Packet loss concealment + depayloader = Gst.ElementFactory.make( + 'rtpopusdepay', 'depayloader') elif self.link_config.encoding == 'pcm': - self.depayloader = gst.element_factory_make( - "rtpL16depay", "depayloader") - - # RTP stuff - self.rtpbin = gst.element_factory_make('gstrtpbin') - self.rtpbin.set_property('latency', self.link_config.jitter_buffer) - self.rtpbin.set_property('autoremove', True) - self.rtpbin.set_property('do-lost', True) - #self.rtpbin.set_property('buffer-mode', 1) + depayloader = Gst.ElementFactory.make( + 'rtpL16depay', 'depayloader') + else: + self.logger.critical('Unknown encoding type %s' % self.link_config.encoding) + + bin.add(depayloader) + + bin.add_pad(Gst.GhostPad.new('sink', depayloader.get_static_pad('sink'))) + + if 'decoder' in locals(): + bin.add(decoder) + depayloader.link(decoder) + bin.add_pad(Gst.GhostPad.new('src', decoder.get_static_pad('src'))) + else: + bin.add_pad(Gst.GhostPad.new('src', depayloader.get_static_pad('src'))) + + return bin + + def build_transport(self): + self.logger.debug('Building RTP transport bin') + bin = Gst.Bin.new('transport') + + caps = self.link_config.get('caps').replace('\\', '') + udpsrc_caps = Gst.Caps.from_string(caps) + # Where audio comes in - self.udpsrc_rtpin = gst.element_factory_make('udpsrc') - self.udpsrc_rtpin.set_property('port', self.link_config.port) + udpsrc = Gst.ElementFactory.make('udpsrc', 'udpsrc') + udpsrc.set_property('port', self.link_config.port) + udpsrc.set_property('caps', udpsrc_caps) + udpsrc.set_property('timeout', 3000000000) if self.link_config.multicast: - self.udpsrc_rtpin.set_property('auto_multicast', True) - self.udpsrc_rtpin.set_property('multicast_group', self.link_config.receiver_host) + udpsrc.set_property('auto_multicast', True) + udpsrc.set_property('multicast_group', self.link_config.receiver_host) self.logger.info('Multicast mode enabled') - caps = caps.replace('\\', '') - # Fix for gstreamer bug in rtpopuspay fixed in GST-plugins-bad - # 50140388d2b62d32dd9d0c071e3051ebc5b4083b, bug 686547 - if self.link_config.encoding == 'opus': - caps = re.sub(r'(caps=.+ )', '', caps) - udpsrc_caps = gst.caps_from_string(caps) - self.udpsrc_rtpin.set_property('caps', udpsrc_caps) - self.udpsrc_rtpin.set_property('timeout', 3000000) + bin.add(udpsrc) - # Our level monitor, also used for continuous audio - self.level = gst.element_factory_make("level") - self.level.set_property('message', True) - self.level.set_property('interval', 1000000000) - - # And now we've got it all set up we need to add the elements - self.pipeline.add( - self.audioconvert, self.audioresample, self.sink, - self.level, self.depayloader, self.rtpbin, self.udpsrc_rtpin) - if self.link_config.encoding != 'pcm': - self.pipeline.add(self.decoder) - gst.element_link_many( - self.depayloader, self.decoder, self.audioconvert) - else: - gst.element_link_many(self.depayloader, self.audioconvert) - gst.element_link_many( - self.audioconvert, self.audioresample, self.level, - self.sink) - self.logger.debug(self.sink) - # Now the RTP pads - self.udpsrc_rtpin.link_pads('src', self.rtpbin, 'recv_rtp_sink_0') + rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin') + rtpbin.set_property('latency', self.link_config.jitter_buffer) + rtpbin.set_property('autoremove', True) + rtpbin.set_property('do-lost', True) + bin.add(rtpbin) + udpsrc.link_pads('src', rtpbin, 'recv_rtp_sink_0') + + valve = Gst.ElementFactory.make('valve', 'valve') + bin.add(valve) + + bin.add_pad(Gst.GhostPad.new('src', valve.get_static_pad('src'))) # Attach callbacks for dynamic pads (RTP output) and busses - self.rtpbin.connect('pad-added', self.rtpbin_pad_added) - self.bus.add_signal_watch() + rtpbin.connect('pad-added', self.rtpbin_pad_added) + + return bin # Our RTPbin won't give us an audio pad till it receives, so we need to # attach it here def rtpbin_pad_added(self, obj, pad): + valve = self.transport.get_by_name('valve') + rtpbin = self.transport.get_by_name('rtpbin') + # Unlink first. - self.rtpbin.unlink(self.depayloader) + rtpbin.unlink(valve) # Relink - self.rtpbin.link(self.depayloader) + rtpbin.link(valve) def on_message(self, bus, message): - if message.type == gst.MESSAGE_ELEMENT: - if message.structure.get_name() == 'level': - if self.started is False: - self.started = True - #gst.DEBUG_BIN_TO_DOT_FILE(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, 'rx-graph') - if len(message.structure['peak']) == 1: - self.logger.info("Receiving mono audio transmission") + if message.type == Gst.MessageType.ELEMENT: + struct = message.get_structure() + if struct != None: + if struct.get_name() == 'level': + if self.started is False: + self.started = True + if len(struct.get_value('peak')) == 1: + self.logger.info('Receiving mono audio transmission') + else: + self.logger.info('Receiving stereo audio transmission') else: - self.logger.info("Receiving stereo audio transmission") - - if message.structure.get_name() == 'GstUDPSrcTimeout': - # Only UDP source configured to emit timeouts is the audio - # input - self.logger.critical("No data received for 3 seconds!") - if self.started: - self.logger.critical("Shutting down receiver for restart") - self.pipeline.set_state(gst.STATE_NULL) - self.loop.quit() + if len(struct.get_value('peak')) == 1: + self.logger.debug('Level: %.2f', struct.get_value('peak')[0]) + else: + self.logger.debug('Levels: L %.2f R %.2f' % (struct.get_value('peak')[0], struct.get_value('peak')[1])) + + if struct.get_name() == 'GstUDPSrcTimeout': + # Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, 'rx-graph') + # Only UDP source configured to emit timeouts is the audio input + self.logger.critical('No data received for 3 seconds!') + if self.started: + self.logger.critical('Shutting down receiver for restart') + self.pipeline.set_state(Gst.State.NULL) + self.main_loop.quit() return True - def run(self): - self.pipeline.set_state(gst.STATE_PLAYING) - self.logger.info('Listening for stream on %s:%i' % (self.link_config.receiver_host, self.link_config.port)) - def loop(self): - self.loop = gobject.MainLoop() - self.loop.run() diff --git a/openob/rtp/tx.py b/openob/rtp/tx.py index d660ca4..90481e8 100755 --- a/openob/rtp/tx.py +++ b/openob/rtp/tx.py @@ -1,167 +1,205 @@ -import gobject -import pygst -pygst.require("0.10") -import gst +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib +Gst.init(None) + import time -import re from openob.logger import LoggerFactory - class RTPTransmitter(object): def __init__(self, node_name, link_config, audio_interface): """Sets up a new RTP transmitter""" - self.started = False - self.caps = 'None' - self.pipeline = gst.Pipeline("tx") - self.bus = self.pipeline.get_bus() - self.bus.connect("message", self.on_message) + self.link_config = link_config self.audio_interface = audio_interface + self.logger_factory = LoggerFactory() self.logger = self.logger_factory.getLogger('node.%s.link.%s.%s' % (node_name, self.link_config.name, self.audio_interface.mode)) - self.logger.info("Creating RTP transmission pipeline") + self.logger.info('Creating transmission pipeline') + + self.build_pipeline() + + def run(self): + self.pipeline.set_state(Gst.State.PLAYING) + # Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, 'tx-graph') + + while self.caps == None: + caps = self.transport.get_by_name('udpsink').get_static_pad('sink').get_property('caps') + + if caps == None: + self.logger.warn('Waiting for audio interface/caps') + time.sleep(0.1) + else: + self.caps = caps.to_string() + + def loop(self): + try: + loop = GLib.MainLoop() + loop.run() + except Exception as e: + self.logger.exception('Encountered a problem in the MainLoop, tearing down the pipeline: %s' % e) + self.pipeline.set_state(Gst.State.NULL) + + def build_pipeline(self): + self.pipeline = Gst.Pipeline.new('tx') + + self.started = False + self.caps = None + + bus = self.pipeline.get_bus() + + self.source = self.build_audio_interface() + self.encoder = self.build_encoder() + self.transport = self.build_transport() + + self.pipeline.add(self.source) + self.pipeline.add(self.encoder) + self.pipeline.add(self.transport) + self.source.link(self.encoder) + self.encoder.link(self.transport) + + # Connect our bus up + bus.add_signal_watch() + bus.connect('message', self.on_message) + + def build_audio_interface(self): + self.logger.debug('Building audio input bin') + bin = Gst.Bin.new('audio') + # Audio input if self.audio_interface.type == 'auto': - self.source = gst.element_factory_make('autoaudiosrc') + source = Gst.ElementFactory.make('autoaudiosrc') elif self.audio_interface.type == 'alsa': - self.source = gst.element_factory_make('alsasrc') - self.source.set_property('device', self.audio_interface.alsa_device) + source = Gst.ElementFactory.make('alsasrc') + source.set_property('device', self.audio_interface.alsa_device) elif self.audio_interface.type == 'jack': - self.source = gst.element_factory_make("jackaudiosrc") + source = Gst.ElementFactory.make('jackaudiosrc') if self.audio_interface.jack_auto: - self.source.set_property('connect', 'auto') + source.set_property('connect', 'auto') else: - self.source.set_property('connect', 'none') - self.source.set_property('buffer-time', 50000) - self.source.set_property('name', self.audio_interface.jack_name) - self.source.set_property('client-name', self.audio_interface.jack_name) - # Audio resampling and conversion - self.audioresample = gst.element_factory_make("audioresample") - self.audioconvert = gst.element_factory_make("audioconvert") - self.audioresample.set_property('quality', 9) # SRC - - # Encoding and payloading - if self.link_config.encoding == 'opus': - self.encoder = gst.element_factory_make("opusenc", "encoder") - self.encoder.set_property('bitrate', int(self.link_config.bitrate) * 1000) - self.encoder.set_property('tolerance', 80000000) - self.encoder.set_property('frame-size', self.link_config.opus_framesize) - self.encoder.set_property('complexity', int(self.link_config.opus_complexity)) - self.encoder.set_property('inband-fec', self.link_config.opus_fec) - self.encoder.set_property('packet-loss-percentage', int(self.link_config.opus_loss_expectation)) - self.encoder.set_property('dtx', self.link_config.opus_dtx) - print(self.encoder.get_properties('bitrate', 'dtx', 'inband-fec')) - self.payloader = gst.element_factory_make("rtpopuspay", "payloader") - elif self.link_config.encoding == 'pcm': - # we have no encoder for PCM operation - self.payloader = gst.element_factory_make("rtpL16pay", "payloader") - else: - self.logger.critical("Unknown encoding type %s" % self.link_config.encoding) - # TODO: Add a tee here, and sort out creating multiple UDP sinks for multipath - # Now the RTP bits - # We'll send audio out on this - self.udpsink_rtpout = gst.element_factory_make("udpsink", "udpsink_rtp") - self.udpsink_rtpout.set_property('host', self.link_config.receiver_host) - self.udpsink_rtpout.set_property('port', self.link_config.port) - self.logger.info('Set receiver to %s:%i' % (self.link_config.receiver_host, self.link_config.port)) + source.set_property('connect', 'none') + source.set_property('buffer-time', 50000) + source.set_property('name', self.audio_interface.jack_name) + source.set_property('client-name', self.audio_interface.jack_name) + elif self.audio_interface.type == 'test': + source = Gst.ElementFactory.make('audiotestsrc') - if self.link_config.multicast: - self.udpsink_rtpout.set_property('auto_multicast', True) - self.logger.info('Multicast mode enabled') + bin.add(source) - # Our RTP manager - self.rtpbin = gst.element_factory_make("gstrtpbin", "gstrtpbin") - self.rtpbin.set_property('latency', 0) # Our level monitor - self.level = gst.element_factory_make("level") - self.level.set_property('message', True) - self.level.set_property('interval', 1000000000) + level = Gst.ElementFactory.make('level') + level.set_property('message', True) + level.set_property('interval', 1000000000) + bin.add(level) - # Add a capsfilter to allow specification of input sample rate - self.capsfilter = gst.element_factory_make("capsfilter") + # Audio resampling and conversion + resample = Gst.ElementFactory.make('audioresample') + resample.set_property('quality', 9) # SRC + bin.add(resample) - # Add to the pipeline - self.pipeline.add( - self.source, self.capsfilter, self.audioresample, self.audioconvert, - self.payloader, self.udpsink_rtpout, self.rtpbin, - self.level) + convert = Gst.ElementFactory.make('audioconvert') + bin.add(convert) - if self.link_config.encoding != 'pcm': - # Only add the encoder if we're not in PCM mode - self.pipeline.add(self.encoder) + # Add a capsfilter to allow specification of input sample rate + capsfilter = Gst.ElementFactory.make('capsfilter') - # Decide which format to apply to the capsfilter (Jack uses float) - if self.audio_interface.type == 'jack': - data_type = 'audio/x-raw-float' - else: - data_type = 'audio/x-raw-int' + caps = Gst.Caps.new_empty_simple('audio/x-raw') # if audio_rate has been specified, then add that to the capsfilter if self.audio_interface.samplerate != 0: - self.capsfilter.set_property( - "caps", gst.Caps('%s, channels=2, rate=%d' % (data_type, self.audio_interface.samplerate))) + caps.set_value('rate', self.audio_interface.samplerate) + + self.logger.debug(caps.to_string()) + capsfilter.set_property('caps', caps) + bin.add(capsfilter) + + source.link(level) + level.link(resample) + resample.link(convert) + convert.link(capsfilter) + + bin.add_pad(Gst.GhostPad.new('src', capsfilter.get_static_pad('src'))) + + return bin + + def build_encoder(self): + self.logger.debug('Building encoder bin') + bin = Gst.Bin.new('encoder') + + # Encoding and payloading + if self.link_config.encoding == 'opus': + encoder = Gst.ElementFactory.make('opusenc', 'encoder') + encoder.set_property('bitrate', int(self.link_config.bitrate) * 1000) + encoder.set_property('tolerance', 80000000) + encoder.set_property('frame-size', self.link_config.opus_framesize) + encoder.set_property('complexity', int(self.link_config.opus_complexity)) + encoder.set_property('inband-fec', self.link_config.opus_fec) + encoder.set_property('packet-loss-percentage', int(self.link_config.opus_loss_expectation)) + encoder.set_property('dtx', self.link_config.opus_dtx) + + payloader = Gst.ElementFactory.make('rtpopuspay', 'payloader') + elif self.link_config.encoding == 'pcm': + # we have no encoder for PCM operation + payloader = Gst.ElementFactory.make('rtpL16pay', 'payloader') else: - self.capsfilter.set_property( - "caps", gst.Caps('%s, channels=2' % data_type)) + self.logger.critical('Unknown encoding type %s' % self.link_config.encoding) - # Then continue linking the pipeline together - gst.element_link_many( - self.source, self.capsfilter, self.level, self.audioresample, self.audioconvert) + bin.add(payloader) - # Now we get to link this up to our encoder/payloader - if self.link_config.encoding != 'pcm': - gst.element_link_many( - self.audioconvert, self.encoder, self.payloader) + if 'encoder' in locals(): + bin.add(encoder) + encoder.link(payloader) + bin.add_pad(Gst.GhostPad.new('sink', encoder.get_static_pad('sink'))) else: - gst.element_link_many(self.audioconvert, self.payloader) - - # And now the RTP bits - self.payloader.link_pads('src', self.rtpbin, 'send_rtp_sink_0') - self.rtpbin.link_pads('send_rtp_src_0', self.udpsink_rtpout, 'sink') - # self.udpsrc_rtcpin.link_pads('src', self.rtpbin, 'recv_rtcp_sink_0') - # # RTCP SRs - # self.rtpbin.link_pads('send_rtcp_src_0', self.udpsink_rtcpout, 'sink') - # Connect our bus up - self.bus.add_signal_watch() - self.bus.connect('message', self.on_message) + bin.add_pad(Gst.GhostPad.new('sink', payloader.get_static_pad('sink'))) - def run(self): - self.pipeline.set_state(gst.STATE_PLAYING) - while self.caps == 'None': - self.logger.debug(self.udpsink_rtpout.get_state()) - self.caps = str( - self.udpsink_rtpout.get_pad('sink').get_property('caps')) - # Fix for gstreamer bug in rtpopuspay fixed in GST-plugins-bad - # 50140388d2b62d32dd9d0c071e3051ebc5b4083b, bug 686547 - if self.link_config.encoding == 'opus': - self.caps = re.sub(r'(caps=.+ )', '', self.caps) + bin.add_pad(Gst.GhostPad.new('src', payloader.get_static_pad('src'))) - if self.caps == 'None': - self.logger.warn("Waiting for audio interface/caps") + return bin - time.sleep(0.1) + def build_transport(self): + self.logger.debug('Building RTP transport bin') + bin = Gst.Bin.new('transport') + # Our RTP manager + rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin') + rtpbin.set_property('latency', 0) + bin.add(rtpbin) - def loop(self): - try: - self.loop = gobject.MainLoop() - self.loop.run() - except Exception as e: - self.logger.exception("Encountered a problem in the MainLoop, tearing down the pipeline: %s" % e) - self.pipeline.set_state(gst.STATE_NULL) + # TODO: Add a tee here, and sort out creating multiple UDP sinks for multipath + udpsink = Gst.ElementFactory.make('udpsink', 'udpsink') + udpsink.set_property('host', self.link_config.receiver_host) + udpsink.set_property('port', self.link_config.port) + self.logger.info('Set receiver to %s:%i' % (self.link_config.receiver_host, self.link_config.port)) + + if self.link_config.multicast: + udpsink.set_property('auto_multicast', True) + self.logger.info('Multicast mode enabled') + bin.add(udpsink) + + bin.add_pad(Gst.GhostPad.new('sink', rtpbin.get_request_pad('send_rtp_sink_0'))) + + rtpbin.link_pads('send_rtp_src_0', udpsink, 'sink') + + return bin def on_message(self, bus, message): - if message.type == gst.MESSAGE_ELEMENT: - if message.structure.get_name() == 'level': - if self.started is False: - self.started = True - #gst.DEBUG_BIN_TO_DOT_FILE(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, 'tx-graph') - #self.logger.debug(self.source.get_property('actual-buffer-time')) - if len(message.structure['peak']) == 1: - self.logger.info("Started mono audio transmission") + if message.type == Gst.MessageType.ELEMENT: + struct = message.get_structure() + if struct != None: + if struct.get_name() == 'level': + if self.started is False: + self.started = True + if len(struct.get_value('peak')) == 1: + self.logger.info('Started mono audio transmission') + else: + self.logger.info('Started stereo audio transmission') else: - self.logger.info("Started stereo audio transmission") + if len(struct.get_value('peak')) == 1: + self.logger.debug('Level: %.2f', struct.get_value('peak')[0]) + else: + self.logger.debug('Levels: L %.2f R %.2f' % (struct.get_value('peak')[0], struct.get_value('peak')[1])) return True def get_caps(self): diff --git a/setup.py b/setup.py index 119cb76..9c00e50 100755 --- a/setup.py +++ b/setup.py @@ -9,8 +9,6 @@ url='http://jamesharrison.github.com/openob', scripts=['bin/openob'], packages=['openob', 'openob.rtp'], - requires=['pygst', 'redis'], - install_requires=['redis'], classifiers=["Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 2",