pedalboard/plugins/GSMFullRateCompressor.h (122 lines of code) (raw):

/* * pedalboard * Copyright 2022 Spotify AB * * Licensed under the GNU Public License, Version 3.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.gnu.org/licenses/gpl-3.0.html * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "../Plugin.h" #include "../plugin_templates/FixedBlockSize.h" #include "../plugin_templates/ForceMono.h" #include "../plugin_templates/PrimeWithSilence.h" #include "../plugin_templates/Resample.h" extern "C" { #include <gsm.h> } namespace Pedalboard { /* * A small C++ wrapper around the C-based libgsm object. * Used mostly to avoid leaking memory. */ class GSMWrapper { public: GSMWrapper() {} ~GSMWrapper() { reset(); } operator bool() const { return _gsm != nullptr; } void reset() { gsm_destroy(_gsm); _gsm = nullptr; } gsm getContext() { if (!_gsm) _gsm = gsm_create(); return _gsm; } private: gsm _gsm = nullptr; }; class GSMFullRateCompressorInternal : public Plugin { public: virtual ~GSMFullRateCompressorInternal(){}; virtual void prepare(const juce::dsp::ProcessSpec &spec) override { bool specChanged = lastSpec.sampleRate != spec.sampleRate || lastSpec.maximumBlockSize < spec.maximumBlockSize || lastSpec.numChannels != spec.numChannels; if (!encoder || specChanged) { reset(); if (spec.sampleRate != GSM_SAMPLE_RATE) { throw std::runtime_error("GSMCompressor plugin must be run at " + std::to_string(GSM_SAMPLE_RATE) + "Hz!"); } if (!encoder.getContext()) { throw std::runtime_error("Failed to initialize GSM encoder."); } if (!decoder.getContext()) { throw std::runtime_error("Failed to initialize GSM decoder."); } lastSpec = spec; } } int process( const juce::dsp::ProcessContextReplacing<float> &context) override final { auto ioBlock = context.getOutputBlock(); if (ioBlock.getNumSamples() != GSM_FRAME_SIZE_SAMPLES) { throw std::runtime_error("GSMCompressor plugin must be passed exactly " + std::to_string(GSM_FRAME_SIZE_SAMPLES) + " at a time."); } if (ioBlock.getNumChannels() != 1) { throw std::runtime_error( "GSMCompressor plugin must be passed mono input!"); } // Convert samples to signed 16-bit integer first, // then pass to the GSM Encoder, then immediately back // around to the GSM decoder. short frame[GSM_FRAME_SIZE_SAMPLES]; juce::AudioDataConverters::convertFloatToInt16LE( ioBlock.getChannelPointer(0), frame, GSM_FRAME_SIZE_SAMPLES); // Actually do the GSM processing! gsm_frame encodedFrame; gsm_encode(encoder.getContext(), frame, encodedFrame); if (gsm_decode(decoder.getContext(), encodedFrame, frame) < 0) { throw std::runtime_error("GSM decoder could not decode frame!"); } juce::AudioDataConverters::convertInt16LEToFloat( frame, ioBlock.getChannelPointer(0), GSM_FRAME_SIZE_SAMPLES); return GSM_FRAME_SIZE_SAMPLES; } void reset() override final { encoder.reset(); decoder.reset(); } static constexpr size_t GSM_FRAME_SIZE_SAMPLES = 160; static constexpr int GSM_SAMPLE_RATE = 8000; private: GSMWrapper encoder; GSMWrapper decoder; }; /** * Use the GSMFullRateCompressorInternal plugin, but: * - ensure that it only ever sees fixed-size blocks of 160 samples * - prime the input with a single block of silence * - resample whatever input sample rate is provided down to 8kHz * - only provide mono input to the plugin, and copy the mono signal * back to stereo if necessary */ using GSMFullRateCompressor = ForceMono<Resample< PrimeWithSilence< FixedBlockSize<GSMFullRateCompressorInternal, GSMFullRateCompressorInternal::GSM_FRAME_SIZE_SAMPLES>, float, GSMFullRateCompressorInternal::GSM_FRAME_SIZE_SAMPLES>, float, GSMFullRateCompressorInternal::GSM_SAMPLE_RATE>>; inline void init_gsm_full_rate_compressor(py::module &m) { py::class_<GSMFullRateCompressor, Plugin, std::shared_ptr<GSMFullRateCompressor>>( m, "GSMFullRateCompressor", "An audio degradation/compression plugin that applies the GSM \"Full " "Rate\" compression algorithm to emulate the sound of a " "2G cellular phone connection. This plugin internally resamples the " "input audio to a fixed sample rate of 8kHz (required by the GSM Full " "Rate codec), although the quality of the resampling algorithm " "can be specified.") .def(py::init([](ResamplingQuality quality) { auto plugin = std::make_unique<GSMFullRateCompressor>(); plugin->getNestedPlugin().setQuality(quality); return plugin; }), py::arg("quality") = ResamplingQuality::WindowedSinc8) .def("__repr__", [](const GSMFullRateCompressor &plugin) { std::ostringstream ss; ss << "<pedalboard.GSMFullRateCompressor"; ss << " at " << &plugin; ss << ">"; return ss.str(); }) .def_property( "quality", [](GSMFullRateCompressor &plugin) { return plugin.getNestedPlugin().getQuality(); }, [](GSMFullRateCompressor &plugin, ResamplingQuality quality) { return plugin.getNestedPlugin().setQuality(quality); }); } }; // namespace Pedalboard