Skip to content

Commit

Permalink
Merge pull request #955 from OpenC3/mqtt_updates
Browse files Browse the repository at this point in the history
Mqtt Stream and Stream Interface. Plus fixes and faraday multipart
  • Loading branch information
ryanmelt authored Dec 8, 2023
2 parents 6bf1244 + 3db9483 commit ff9d2e1
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 9 deletions.
20 changes: 11 additions & 9 deletions openc3/lib/openc3/interfaces/mqtt_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ def disconnect
end

def read
topic = @read_topics.shift
packet = super()
topic = @read_topics.shift
return nil unless packet
identified_packet = @read_packets_by_topic[topic]
if identified_packet
Expand All @@ -175,15 +175,17 @@ def read
end

def write(packet)
topics = packet.meta['TOPIC']
topics = packet.meta['TOPICS'] unless topics
if topics
topics.each do |topic|
@write_topics << topic
super(packet)
@write_mutex.synchronize do
topics = packet.meta['TOPIC']
topics = packet.meta['TOPICS'] unless topics
if topics
topics.each do |topic|
@write_topics << topic
super(packet)
end
else
raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS"
end
else
raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS"
end
end

Expand Down
78 changes: 78 additions & 0 deletions openc3/lib/openc3/interfaces/mqtt_stream_interface.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# encoding: ascii-8bit

# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

require 'openc3/interfaces/stream_interface'
require 'openc3/streams/mqtt_stream'

module OpenC3
class MqttStreamInterface < StreamInterface
# @param hostname [String] MQTT server to connect to
# @param port [Integer] MQTT port
# @param ssl [Boolean] Use SSL true/false
def initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil, protocol_type = nil, *protocol_args)
super(protocol_type, protocol_args)
@hostname = hostname
@port = Integer(port)
@ssl = ConfigParser.handle_true_false(ssl)
@write_topic = ConfigParser.handle_nil(write_topic)
@read_topic = ConfigParser.handle_nil(read_topic)
@username = nil
@password = nil
@cert = nil
@key = nil
@ca_file = nil
end

# Creates a new {SerialStream} using the parameters passed in the constructor
def connect
@stream = MqttStream.new(@hostname, @port, @ssl, @write_topic, @read_topic)
@stream.username = @username if @username
@stream.password = @password if @password
@stream.cert = @cert if @cert
@stream.key = @key if @key
@stream.ca_file = @ca_file if @ca_file
super()
end

# Supported Options
# USERNAME - Username for Mqtt Server
# PASSWORD - Password for Mqtt Server
# CERT - Public Key for Client Cert Auth
# KEY - Private Key for Client Cert Auth
# CA_FILE - Certificate Authority for Client Cert Auth
# (see Interface#set_option)
def set_option(option_name, option_values)
super(option_name, option_values)
case option_name.upcase
when 'USERNAME'
@username = option_values[0]
when 'PASSWORD'
@password = option_values[0]
when 'CERT'
@cert = option_values[0]
when 'KEY'
@key = option_values[0]
when 'CA_FILE'
# CA_FILE must be given as a file
@ca_file = Tempfile.new('ca_file')
@ca_file.write(option_values[0])
@ca_file.close
end
end
end
end
109 changes: 109 additions & 0 deletions openc3/lib/openc3/streams/mqtt_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# encoding: ascii-8bit

# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

require 'openc3/interfaces/mqtt_interface' # For MQTT patches
require 'openc3/streams/stream'
require 'openc3/config/config_parser'

module OpenC3
class MqttStream < Stream
attr_reader :hostname
attr_reader :port
attr_reader :ssl
attr_reader :write_topic
attr_reader :read_topic
attr_accessor :username
attr_accessor :password
attr_accessor :cert
attr_accessor :key
attr_accessor :ca_file

def initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil)
super()

@hostname = hostname
@port = Integer(port)
@ssl = ConfigParser.handle_true_false(ssl)
@write_topic = ConfigParser.handle_nil(write_topic)
@read_topic = ConfigParser.handle_nil(read_topic)
@connected = false

@username = nil
@password = nil
@cert = nil
@key = nil
@ca_file = nil

# Mutex on write is needed to protect from commands coming in from more
# than one tool
@write_mutex = Mutex.new
end

# @return [String] Returns a binary string of data from the read_topic
def read
raise "Attempt to read from write only stream" unless @read_topic

# No read mutex is needed because reads happen serially
_, data = @client.get
if data.nil? or data.length <= 0
Logger.info "MqttStream: read returned nil" if data.nil?
Logger.info "MqttStream: read returned 0 bytes" if not data.nil? and data.length <= 0
return nil
end

return data
end

# @param data [String] A binary string of data to write to the write_topic
def write(data)
raise "Attempt to write to read only stream" unless @write_topic

@write_mutex.synchronize do
@client.publish(@write_topic, data)
end
end

# Connect the stream
def connect
@client = MQTT::Client.new
@client.host = @hostname
@client.port = @port
@client.ssl = @ssl
@client.username = @username if @username
@client.password = @password if @password
@client.cert = @cert if @cert
@client.key = @key if @key
@client.ca_file = @ca_file.path if @ca_file
@client.connect
@client.subscribe(@read_topic) if @read_topic
@connected = true
end

def connected?
@connected
end

def disconnect
if @connected
@client.disconnect
@client = nil
@connected = false
end
end
end
end
1 change: 1 addition & 0 deletions openc3/openc3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ spec = Gem::Specification.new do |s|
s.add_runtime_dependency 'httpclient', '~> 2.8'
# faraday includes faraday-net_http as the default adapter
s.add_runtime_dependency 'faraday', '~> 2.7'
s.add_runtime_dependency 'faraday-multipart', '~> 1.0'
s.add_runtime_dependency 'faraday-follow_redirects', '~> 0.3'
s.add_runtime_dependency 'aws-sdk-s3', '< 2'
s.add_runtime_dependency 'tzinfo-data', '~> 1.2023'
Expand Down

0 comments on commit ff9d2e1

Please sign in to comment.