Python Examples
The following examples demonstrate how the Sonardyne API Python library can be used to read, write, and stream data in and out of a device compatible with version 2.0.0 of the Sonardyne API, which can be installed by running:
pip install git+https://github.com/Sonardyne/son-idl.git@2.0.0#subdirectory=libraries/python/sonardyne_api
See the Quick Start guide for information on setting up your device for use with the API.
Sensors
The Sensor component represents some physical device which observes data.
The API can be used to get and set the configuration of a particular sensor. This could include, for example, enabling raw data logging for a Doppler Velocity Log (DVL).
Sensors can also send and receive SensorsObservations using the ObservationService.
DVL Sensor
DVL Sensor Configuration
Example code: dvl_sensor_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlSensorConfiguration(
is_raw_logging_enabled=False
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlSensorConfiguration(
is_raw_logging_enabled=False
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlSensorConfiguration(
is_raw_logging_enabled=False
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sensors.dvl_sensor.DvlSensorConfiguration",
"isRawLoggingEnabled": false
}
}
]
}, indent=4).encode())
Modules
The Module component represents some non-algorithm, non-sensor, non-data source component e.g. a BatteryModule
which is responsible for managing a battery and enabling/disabling charging.
Time Module
Time Module Configuration
Example code: time_module_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.time_module.TimeModuleConfiguration",
"inputTimeSources": {
"inputZdaTimeSource": {
"setNone": true
},
"inputPpsTimeSource": {
"setNone": true
},
"inputNtpTimeSource": {
"setUid": {
"name": "NTP Time Source"
}
}
}
}
}
]
}, indent=4).encode())
External Logging Module
External Logging Module Configuration
Example code: external_logging_module_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
ethernet_logging_port=son.EthernetPort(
source_port=8105,
tcp_server=son.TcpServerParameters()
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
ethernet_logging_port=son.EthernetPort(
source_port=8105,
tcp_server=son.TcpServerParameters()
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
ethernet_logging_port=son.EthernetPort(
source_port=8105,
tcp_server=son.TcpServerParameters()
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.external_logging_module.ExternalLoggingModuleConfiguration",
"ethernetLoggingPort": {
"sourcePort": 8105,
"tcpServer": {}
}
}
}
]
}, indent=4).encode())
Output Message Module
Output Message Module Command Remove Output
Example code: output_message_module_command_remove_output.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleCommandRemoveOutput",
"outputMessage": {
"outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
"dataPortOutput": {
"setAnyEthernetPort": true
},
"rateHz": 10.0,
"remotePoint": {
"setUid": {
"name": "Remote Point 2"
}
}
}
}
}
]
}, indent=4).encode())
Output Message Module Command Add Output
Example code: output_message_module_command_add_output.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=20.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=20.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
output_message=
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=20.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleCommandAddOutput",
"outputMessage": {
"outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
"dataPortOutput": {
"setAnyEthernetPort": true
},
"rateHz": 20.0,
"remotePoint": {
"setUid": {
"name": "Remote Point 3"
}
}
}
}
}
]
}, indent=4).encode())
Output Message Module Configuration
Example code: output_message_module_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
output_message_list=son.OutputMessageList(
output_messages=[
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
),
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
rate_hz=5.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
]
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
output_message_list=son.OutputMessageList(
output_messages=[
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
),
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
rate_hz=5.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
]
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
output_message_list=son.OutputMessageList(
output_messages=[
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
rate_hz=10.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
),
son.OutputMessage(
output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
rate_hz=5.0,
remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
data_port_output=son.DataPortReference(set_any_ethernet_port=True)
)
]
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleConfiguration",
"outputMessageList": {
"outputMessages": [
{
"outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
"dataPortOutput": {
"setAnyEthernetPort": true
},
"rateHz": 10.0,
"remotePoint": {
"setUid": {
"name": "Remote Point 2"
}
}
},
{
"outputMessageType": "OUTPUT_MESSAGE_TYPE_LNAV",
"dataPortOutput": {
"setAnyEthernetPort": true
},
"rateHz": 5.0,
"remotePoint": {
"setUid": {
"name": "Common Reference Point"
}
}
}
]
}
}
}
]
}, indent=4).encode())
Battery Module
Battery Module Configuration
Example code: battery_module_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.BatteryModuleConfiguration(
is_battery_charging_enabled=False
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.BatteryModuleConfiguration(
is_battery_charging_enabled=False
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.BatteryModuleConfiguration(
is_battery_charging_enabled=False
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.battery_module.BatteryModuleConfiguration",
"isBatteryChargingEnabled": false
}
}
]
}, indent=4).encode())
Navigation Module
Navigation Module Observation
Example code: navigation_module_observation.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
print(time.time())
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
print(time.time())
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
print(time.time())
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "NavigationModuleObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Device Module
Device Module Command Shutdown
Example code: device_module_command_shutdown.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.modules.device_module.DeviceModuleCommandShutdown"
}
}
]
}, indent=4).encode())
Device Module Command Factory Reset
Example code: device_module_command_factory_reset.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.modules.device_module.DeviceModuleCommandFactoryReset"
}
}
]
}, indent=4).encode())
Network Module
Network Module Configuration
Example code: network_module_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NetworkModuleConfiguration(
subnet_mask="255.255.255.0",
ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
gateway_ip="192.168.179.1",
dhcp_enabled=False
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NetworkModuleConfiguration(
subnet_mask="255.255.255.0",
ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
gateway_ip="192.168.179.1",
dhcp_enabled=False
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NetworkModuleConfiguration(
subnet_mask="255.255.255.0",
ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
gateway_ip="192.168.179.1",
dhcp_enabled=False
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.network_module.NetworkModuleConfiguration",
"ipAddressList": {
"ipAddresses": [
"192.168.179.56"
]
},
"subnetMask": "255.255.255.0",
"gatewayIp": "192.168.179.1",
"dhcpEnabled": false
}
}
]
}, indent=4).encode())
Services
Observation Service
Example code: observation_service.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "GnssSourceObservation"
}
},
{
"matchingCriteria": {
"matchTypeNameSuffix": "XposSourceObservation"
}
},
{
"matchingCriteria": {
"matchTypeNameSuffix": "SusblSourceObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Configuration Service
Example code: configuration_service.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print("all configurations:")
configurations = wrapper.get_configurations("Configuration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
print("all configurations ending in SourceConfiguration:")
configurations = wrapper.get_configurations("SourceConfiguration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
configuration_stream = wrapper.open_configuration_stream(
son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
])
)
print("\n(listening to configuration stream) press ctrl-c to stop stream")
try:
while True:
for message in configuration_stream.recv():
print(type(message), message)
print("(listening to configuration stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print("all configurations:")
configurations = wrapper.get_configurations("Configuration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
print("all configurations ending in SourceConfiguration:")
configurations = wrapper.get_configurations("SourceConfiguration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
configuration_stream = wrapper.open_configuration_stream(
son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
])
)
print("\n(listening to configuration stream) press ctrl-c to stop stream")
try:
while True:
for message in configuration_stream.recv():
print(type(message), message)
print("(listening to configuration stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print("all configurations:")
configurations = wrapper.get_configurations("Configuration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
print("all configurations ending in SourceConfiguration:")
configurations = wrapper.get_configurations("SourceConfiguration")
for configuration in configurations:
print(f" • {type(configuration).__name__}")
configuration_stream = wrapper.open_configuration_stream(
son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
])
)
print("\n(listening to configuration stream) press ctrl-c to stop stream")
try:
while True:
for message in configuration_stream.recv():
print(type(message), message)
print("(listening to configuration stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
"matchingCriteria": {
"matchTypeNameSuffix": "Configuration"
}
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 2,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
"matchingCriteria": {
"matchTypeNameSuffix": "SourceConfiguration"
}
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.ConfigurationEnvelope",
"configurationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.ConfigurationSubscriptionRequest",
"configurationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "Configuration"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Command Service
Example code: command_service.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
wrapper.send_command(son.InsAlgorithmCommandReset())
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
wrapper.send_command(son.InsAlgorithmCommandReset())
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
wrapper.send_command(son.InsAlgorithmCommandReset())
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmCommandReset"
}
}
]
}, indent=4).encode())
Comms Service
Example code: comms_service.py
import sonardyne_api as son
import time
import random
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
comms_stream = wrapper.open_comms_stream(
son.CommsSubscriptionRequest(comms_subscriptions=[
son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
])
)
# write to stream
print("\n(writing to comms stream) press ctrl-c to stop stream")
try:
while True:
comms_stream.send(
son.DataPortComms(
uid=son.UniqueID(name="TCP-8110"),
data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
data=f"Hello World! {random.randrange(100)}".encode()
)
)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
print("\n(listening to comms stream) press ctrl-c to stop stream")
try:
while True:
for message in comms_stream.recv():
print(type(message), message)
print("(listening to comms stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
import random
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
comms_stream = wrapper.open_comms_stream(
son.CommsSubscriptionRequest(comms_subscriptions=[
son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
])
)
# write to stream
print("\n(writing to comms stream) press ctrl-c to stop stream")
try:
while True:
comms_stream.send(
son.DataPortComms(
uid=son.UniqueID(name="TCP-8110"),
data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
data=f"Hello World! {random.randrange(100)}".encode()
)
)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
print("\n(listening to comms stream) press ctrl-c to stop stream")
try:
while True:
for message in comms_stream.recv():
print(type(message), message)
print("(listening to comms stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
import random
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
comms_stream = wrapper.open_comms_stream(
son.CommsSubscriptionRequest(comms_subscriptions=[
son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
])
)
# write to stream
print("\n(writing to comms stream) press ctrl-c to stop stream")
try:
while True:
comms_stream.send(
son.DataPortComms(
uid=son.UniqueID(name="TCP-8110"),
data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
data=f"Hello World! {random.randrange(100)}".encode()
)
)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
print("\n(listening to comms stream) press ctrl-c to stop stream")
try:
while True:
for message in comms_stream.recv():
print(type(message), message)
print("(listening to comms stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.comms_service.CommsEnvelope",
"commsMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.comms_service.CommsSubscriptionRequest",
"commsSubscriptions": [
{
"matchingCriteria": {
"matchUids": [
{
"name": "TCP-8110"
}
]
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Sources
The Source component represents some "aiding" or "input" source e.g. GNSS or Time.
The API can be used to get and set the configuration of a particular source. This could include, for example, the source's lever arms; the lever arms relate the position of the source to the central reference point of the vessel.
Sources can also send and receive SourceObservations using the ObservationService.
GNSS Source
GNSS Source Configuration
Example code: gnss_source_configuration.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.GnssSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.GnssSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.GnssSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.gnss_source.GnssSourceConfiguration",
"inputDataPort": {
"setUid": {
"name": "External Control"
}
}
}
}
]
}, indent=4).encode())
GNSS Source Observation
Example code: gnss_source_observation.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
altitude_metres=0.0,
geoid_separation_metres=0.0,
hdop=0.5,
age_seconds=0.0,
number_of_satellites=10,
reference_station_id=None,
fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
altitude_metres=0.0,
geoid_separation_metres=0.0,
hdop=0.5,
age_seconds=0.0,
number_of_satellites=10,
reference_station_id=None,
fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
altitude_metres=0.0,
geoid_separation_metres=0.0,
hdop=0.5,
age_seconds=0.0,
number_of_satellites=10,
reference_station_id=None,
fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.sources.gnss_source.GnssSourceObservation",
"ggaTelegram": {
"timeOfValidity": {
"commonTimeSeconds": 1772458213.2997377
},
"hdop": 0.5,
"numberOfSatellites": 10,
"fixQuality": "FIX_QUALITY_GNSS_FIX"
}
}
]
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "GnssSourceObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Example code: time_module_configuration.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.TimeModuleConfiguration(
input_time_sources=son.TimeSourceInputReferences(
input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.modules.time_module.TimeModuleConfiguration",
"inputTimeSources": {
"inputZdaTimeSource": {
"setAny": true
},
"inputPpsTimeSource": {
"setNone": true
},
"inputNtpTimeSource": {
"setNone": true
}
}
}
}
]
}, indent=4).encode())
XPOS Source
XPOS Source Observation
Example code: xpos_source_observation.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.XposSourceObservation(xpos_data=son.XposData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
depth_metres=0.0,
depth_uncertainty_metres=1.0,
horizontal_position_uncertainty_metres=0.5,
source=son.XPOS_SOURCE_GNSS
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.XposSourceObservation(xpos_data=son.XposData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
depth_metres=0.0,
depth_uncertainty_metres=1.0,
horizontal_position_uncertainty_metres=0.5,
source=son.XPOS_SOURCE_GNSS
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.XposSourceObservation(xpos_data=son.XposData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
latitude_radians=0.0,
longitude_radians=0.0,
depth_metres=0.0,
depth_uncertainty_metres=1.0,
horizontal_position_uncertainty_metres=0.5,
source=son.XPOS_SOURCE_GNSS
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.sources.xpos_source.XposSourceObservation",
"xposData": {
"timeOfValidity": {
"commonTimeSeconds": 1772458213.6010525
},
"horizontalPositionUncertaintyMetres": 0.5,
"depthUncertaintyMetres": 1.0,
"source": "XPOS_SOURCE_GNSS"
}
}
]
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "XposSourceObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
XPOS Source Configuration
Example code: xpos_source_configuration.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.XposSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.XposSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.XposSourceConfiguration(
input_data_port=son.DataPortReference(
set_uid=son.UniqueID(name="External Control")
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.xpos_source.XposSourceConfiguration",
"inputDataPort": {
"setUid": {
"name": "External Control"
}
}
}
}
]
}, indent=4).encode())
SUSBL Source
SUSBL Source Configuration
Example code: susbl_source_configuration.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.susbl_source.SusblSourceConfiguration",
"inputDataPort": {
"setUid": {
"name": "External Control"
}
}
}
}
]
}, indent=4).encode())
SUSBL Source Observation
Example code: susbl_source_observation.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
x_coordinate=0,
y_coordinate=0,
transponder_code=0,
is_valid=True,
orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
error_code="",
software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
depth_metres=0,
position_uncertainty_metres=1
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
x_coordinate=0,
y_coordinate=0,
transponder_code=0,
is_valid=True,
orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
error_code="",
software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
depth_metres=0,
position_uncertainty_metres=1
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
x_coordinate=0,
y_coordinate=0,
transponder_code=0,
is_valid=True,
orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
error_code="",
software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
depth_metres=0,
position_uncertainty_metres=1
)))
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.sources.susbl_source.SusblSourceObservation",
"psimssbTelegram": {
"timeOfValidity": {
"commonTimeSeconds": 1772458214.0693219
},
"coordinateType": "PSIMSSB_COORDINATE_SYSTEM_RADIANS",
"isValid": true,
"orientationType": "PSIMSSB_ORIENTATION_NORTH",
"softwareFilter": "PSIMSSB_SOFTWARE_FILTER_UNKNOWN",
"positionUncertaintyMetres": 1.0
}
}
]
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "SusblSourceObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Sound Velocity Source
Sound Velocity Source Observation
Example code: sound_velocity_source_observation.py
import sonardyne_api as son
import time
import random
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
import random
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
import random
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
time_of_validity=son.Timestamp(common_time_seconds=time.time()),
sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
))
observation_stream.send(observation)
time.sleep(1)
except KeyboardInterrupt:
pass
# read from stream
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
])
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceObservation",
"soundVelocityData": {
"timeOfValidity": {
"commonTimeSeconds": 1772458214.2173042
},
"soundVelocityMetersPerSecond": 1503.64
}
}
]
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "SoundVelocitySourceObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
Sound Velocity Source Configuration
Example code: sound_velocity_source_configuration.py
import sonardyne_api as son
import time
import random
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
)
import sonardyne_api as son
import time
import random
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
)
import sonardyne_api as son
import time
import random
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
)
print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
)
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
"inputSoundVelocityDataSourceReference": {
"setSalinityPartsPerThousand": 100.0
}
}
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 2,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
"inputSoundVelocityDataSourceReference": {
"setManualSoundVelocityMetresPerSecond": 150.0
}
}
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 3,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
"inputSoundVelocityDataSourceReference": {
"setAnyEthernetPort": true
}
}
}
]
}, indent=4).encode())
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 4,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
"inputSoundVelocityDataSourceReference": {
"setAnyExternalControlPort": true
}
}
}
]
}, indent=4).encode())
Time Source
PPS Time Source Configuration
Example code: pps_time_source_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.time_source.PpsTimeSourceConfiguration",
"inputTriggerPort": {
"setAny": true
}
}
}
]
}, indent=4).encode())
NTP Time Source Configuration
Example code: ntp_time_source_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.time_source.NtpTimeSourceConfiguration",
"ntpServerIpAddress": "230.229.228.227"
}
}
]
}, indent=4).encode())
ZDA Time Source Observation
Example code: zda_time_source_observation.py
import sonardyne_api as son
import time
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
time.sleep(1)
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
time.sleep(1)
except KeyboardInterrupt:
pass
import sonardyne_api as son
import time
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
# write to stream
observation_stream = wrapper.open_observation_stream()
print("\n(writing to observation stream) press ctrl-c to stop stream")
try:
while True:
observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
time.sleep(1)
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.sources.time_source.ZdaTimeSourceObservation",
"zdaData": {
"currentUtcTimeSeconds": 1772458214.925132
}
}
]
}
]
}, indent=4).encode())
ZDA Time Source Configuration
Example code: zda_time_source_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.time_source.ZdaTimeSourceConfiguration",
"inputDataPort": {
"setAnyEthernetPort": true
}
}
}
]
}, indent=4).encode())
Depth Source
Depth Source Configuration
Example code: depth_source_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DepthSourceConfiguration(
input_depth_data_source_reference=son.DepthDataSourceReference(
set_uid=son.UniqueID(name="TCP-8110")
),
lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DepthSourceConfiguration(
input_depth_data_source_reference=son.DepthDataSourceReference(
set_uid=son.UniqueID(name="TCP-8110")
),
lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DepthSourceConfiguration(
input_depth_data_source_reference=son.DepthDataSourceReference(
set_uid=son.UniqueID(name="TCP-8110")
),
lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.sources.depth_source.DepthSourceConfiguration",
"inputDepthDataSourceReference": {
"setUid": {
"name": "TCP-8110"
}
},
"leverArms": {
"forwardMetres": 1.5,
"starboardMetres": 2.0,
"downMetres": 3.2
}
}
}
]
}, indent=4).encode())
Ports
The Port List component represents some collection of data inputs/outputs.
Some port lists, e.g. ethernet port lists, can have entries added and removed. Other port lists, e.g. serial port lists, represent physical ports on the device, so are fixed-length.
On a data port such as a SerialPort or an EthernetPort, it is possible to send and receive PortComms using the CommsService.
Serial Port
Serial Port List Configuration
Example code: serial_port_list_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
])))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
])))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
])))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.port_lists.serial_port_list.SerialPortListConfiguration",
"serialPorts": [
{
"baudRate": "BAUD_RATE_9600"
},
{
"baudRate": "BAUD_RATE_38400"
}
]
}
}
]
}, indent=4).encode())
Ethernet Port
Ethernet Port List Configuration
Example code: ethernet_port_list_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
])))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
])))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
])))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.port_lists.ethernet_port_list.EthernetPortListConfiguration",
"ethernetPorts": [
{
"sourcePort": 8110,
"tcpServer": {}
},
{
"sourcePort": 8111,
"tcpClient": {
"serverIpAddress": "192.168.179.60"
}
},
{
"sourcePort": 8112,
"udpSocket": {
"destinationIpAddress": "192.168.179.70"
}
}
]
}
}
]
}, indent=4).encode())
Trigger Port
Trigger Port List Configuration
Example code: trigger_port_list_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
"matchingCriteria": {
"matchTypeNameSuffix": "TriggerPortListConfiguration"
}
}
]
}, indent=4).encode())
External Control Port
External Control Port List Configuration
Example code: external_control_port_list_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
"matchingCriteria": {
"matchTypeNameSuffix": "ExternalControlPortListConfiguration"
}
}
]
}, indent=4).encode())
Power Pass Port
Power Pass Port List Configuration
Example code: power_pass_port_list_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
])))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
])))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
])))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.port_lists.power_pass_port_list.PowerPassPortListConfiguration",
"powerPassPorts": [
{
"powerPassState": "POWER_PASS_STATE_TRIPPED"
},
{
"powerPassState": "POWER_PASS_STATE_ENABLED"
}
]
}
}
]
}, indent=4).encode())
Algorithms
The Algorithm component represents some internal algorithm e.g. an INS algorithm.
INS Algorithm
INS Algorithm Command Reset
Example code: ins_algorithm_command_reset.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmCommandReset"
}
}
]
}, indent=4).encode())
INS Algorithm Configuration
Example code: ins_algorithm_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
is_gnss_enabled=True,
is_xpos_enabled=False
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
is_gnss_enabled=True,
is_xpos_enabled=False
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
is_gnss_enabled=True,
is_xpos_enabled=False
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmConfiguration",
"isGnssEnabled": true,
"isXposEnabled": false
}
}
]
}, indent=4).encode())
INS Algorithm Observation
Example code: ins_algorithm_observation.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
observation_stream = wrapper.open_observation_stream(
son.ObservationSubscriptionRequest(
observation_subscriptions=[
son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
]
)
)
print("\n(listening to observation stream) press ctrl-c to stop stream")
try:
while True:
for message in observation_stream.recv():
print(type(message), message)
print("(listening to observation stream) press ctrl-c to stop stream")
except KeyboardInterrupt:
pass
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
"observationMessages": [
{
"@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
"observationSubscriptions": [
{
"matchingCriteria": {
"matchTypeNameSuffix": "AidingObservation"
}
}
]
}
]
}
]
}, indent=4).encode())
while True:
if data := socket.recv(10240):
print(data.decode())
DVL Algorithm
DVL Algorithm Configuration
Example code: dvl_algorithm_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
adcp_parameters=son.DvlAdcpParameters(
number_of_cells=son.BoundedUInt32(value=100),
preferred_cell_width_metres=son.BoundedDouble(value=0.1),
pings_to_average=son.BoundedUInt32(value=10),
dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
),
input_trigger_parameters=son.DvlInputTriggerParameters(
input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
input_trigger_edge=son.TRIGGER_EDGE_RISING,
input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
)
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
adcp_parameters=son.DvlAdcpParameters(
number_of_cells=son.BoundedUInt32(value=100),
preferred_cell_width_metres=son.BoundedDouble(value=0.1),
pings_to_average=son.BoundedUInt32(value=10),
dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
),
input_trigger_parameters=son.DvlInputTriggerParameters(
input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
input_trigger_edge=son.TRIGGER_EDGE_RISING,
input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
)
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
adcp_parameters=son.DvlAdcpParameters(
number_of_cells=son.BoundedUInt32(value=100),
preferred_cell_width_metres=son.BoundedDouble(value=0.1),
pings_to_average=son.BoundedUInt32(value=10),
dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
),
input_trigger_parameters=son.DvlInputTriggerParameters(
input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
input_trigger_edge=son.TRIGGER_EDGE_RISING,
input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
)
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.dvl_algorithm.DvlAlgorithmConfiguration",
"dvlRate": {
"value": "DVL_RATE_ENUM_FIXED_5HZ"
},
"dvlMode": {
"value": "DVL_MODE_ENUM_DVL"
},
"inputTriggerParameters": {
"inputTriggerPort": {
"setUid": {
"name": "Trigger A1"
}
},
"inputTriggerEdge": "TRIGGER_EDGE_RISING",
"inputTriggerDelaySeconds": {
"value": 0.1
}
},
"adcpParameters": {
"numberOfCells": {
"value": 100
},
"preferredCellWidthMetres": {
"value": 0.1
},
"pingsToAverage": {
"value": 10
},
"dvlAdcpPingRatio": {
"value": 10
},
"velocityLimit": {
"value": "ADCP_VELOCITY_LIMIT_ENUM_STANDARD"
}
}
}
}
]
}, indent=4).encode())
AHRS Algorithm
AHRS Algorithm Configuration
Example code: ahrs_algorithm_configuration.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
coarse_position_latitude_radians=0.1,
coarse_position_longitude_radians=0.2
)))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
coarse_position_latitude_radians=0.1,
coarse_position_longitude_radians=0.2
)))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
coarse_position_latitude_radians=0.1,
coarse_position_longitude_radians=0.2
)))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
"configuration": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.ahrs_algorithm.AhrsAlgorithmConfiguration",
"coarsePositionLatitudeRadians": 0.1,
"coarsePositionLongitudeRadians": 0.2
}
}
]
}, indent=4).encode())
AHRS Algorithm Command Reset
Example code: ahrs_algorithm_command_reset.py
import sonardyne_api as son
with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import sonardyne_api as son
with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import sonardyne_api as son
with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:
print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import json, socket
socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
"@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
"uid": 1,
"requests": [
{
"@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
"command": {
"@type": "type.googleapis.com/sonardyne.api.algorithms.ahrs_algorithm.AhrsAlgorithmCommandReset"
}
}
]
}, indent=4).encode())