Skip to content

Python Examples

The following examples demonstrate how the Sonardyne API Python library can be used to read, write, and stream data in and out of a device compatible with version 2.0.0 of the Sonardyne API, which can be installed by running:

pip install git+https://github.com/Sonardyne/son-idl.git@2.0.0#subdirectory=libraries/python/sonardyne_api

See the Quick Start guide for information on setting up your device for use with the API.

Sensors

The Sensor component represents some physical device which observes data.

The API can be used to get and set the configuration of a particular sensor. This could include, for example, enabling raw data logging for a Doppler Velocity Log (DVL).

Sensors can also send and receive SensorsObservations using the ObservationService.

DVL Sensor

DVL Sensor Configuration

Example code: dvl_sensor_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlSensorConfiguration(
        is_raw_logging_enabled=False
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlSensorConfiguration(
        is_raw_logging_enabled=False
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlSensorConfiguration(
        is_raw_logging_enabled=False
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sensors.dvl_sensor.DvlSensorConfiguration",
                "isRawLoggingEnabled": false
            }
        }
    ]
}, indent=4).encode())

Modules

The Module component represents some non-algorithm, non-sensor, non-data source component e.g. a BatteryModule which is responsible for managing a battery and enabling/disabling charging.

Time Module

Time Module Configuration

Example code: time_module_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
            input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
            input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_uid=son.UniqueID(name="NTP Time Source")),
            input_zda_time_source=son.ZdaTimeSourceReference(set_none=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.time_module.TimeModuleConfiguration",
                "inputTimeSources": {
                    "inputZdaTimeSource": {
                        "setNone": true
                    },
                    "inputPpsTimeSource": {
                        "setNone": true
                    },
                    "inputNtpTimeSource": {
                        "setUid": {
                            "name": "NTP Time Source"
                        }
                    }
                }
            }
        }
    ]
}, indent=4).encode())

External Logging Module

External Logging Module Configuration

Example code: external_logging_module_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
        ethernet_logging_port=son.EthernetPort(
            source_port=8105,
            tcp_server=son.TcpServerParameters()
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
        ethernet_logging_port=son.EthernetPort(
            source_port=8105,
            tcp_server=son.TcpServerParameters()
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ExternalLoggingModuleConfiguration(
        ethernet_logging_port=son.EthernetPort(
            source_port=8105,
            tcp_server=son.TcpServerParameters()
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.external_logging_module.ExternalLoggingModuleConfiguration",
                "ethernetLoggingPort": {
                    "sourcePort": 8105,
                    "tcpServer": {}
                }
            }
        }
    ]
}, indent=4).encode())

Output Message Module

Output Message Module Command Remove Output

Example code: output_message_module_command_remove_output.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=10.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=10.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandRemoveOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=10.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleCommandRemoveOutput",
                "outputMessage": {
                    "outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
                    "dataPortOutput": {
                        "setAnyEthernetPort": true
                    },
                    "rateHz": 10.0,
                    "remotePoint": {
                        "setUid": {
                            "name": "Remote Point 2"
                        }
                    }
                }
            }
        }
    ]
}, indent=4).encode())

Output Message Module Command Add Output

Example code: output_message_module_command_add_output.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=20.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=20.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.OutputMessageModuleCommandAddOutput(
        output_message=
        son.OutputMessage(
            output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
            rate_hz=20.0,
            remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 3")),
            data_port_output=son.DataPortReference(set_any_ethernet_port=True)
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleCommandAddOutput",
                "outputMessage": {
                    "outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
                    "dataPortOutput": {
                        "setAnyEthernetPort": true
                    },
                    "rateHz": 20.0,
                    "remotePoint": {
                        "setUid": {
                            "name": "Remote Point 3"
                        }
                    }
                }
            }
        }
    ]
}, indent=4).encode())

Output Message Module Configuration

Example code: output_message_module_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
        output_message_list=son.OutputMessageList(
            output_messages=[
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
                    rate_hz=10.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                ),
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
                    rate_hz=5.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                )
            ]
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
        output_message_list=son.OutputMessageList(
            output_messages=[
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
                    rate_hz=10.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                ),
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
                    rate_hz=5.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                )
            ]
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.OutputMessageModuleConfiguration(
        output_message_list=son.OutputMessageList(
            output_messages=[
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_GGA,
                    rate_hz=10.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Remote Point 2")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                ),
                son.OutputMessage(
                    output_message_type=son.OutputMessageType.OUTPUT_MESSAGE_TYPE_LNAV,
                    rate_hz=5.0,
                    remote_point=son.RemotePointReference(set_uid=son.UniqueID(name="Common Reference Point")),
                    data_port_output=son.DataPortReference(set_any_ethernet_port=True)
                )
            ]
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.output_message_module.OutputMessageModuleConfiguration",
                "outputMessageList": {
                    "outputMessages": [
                        {
                            "outputMessageType": "OUTPUT_MESSAGE_TYPE_GGA",
                            "dataPortOutput": {
                                "setAnyEthernetPort": true
                            },
                            "rateHz": 10.0,
                            "remotePoint": {
                                "setUid": {
                                    "name": "Remote Point 2"
                                }
                            }
                        },
                        {
                            "outputMessageType": "OUTPUT_MESSAGE_TYPE_LNAV",
                            "dataPortOutput": {
                                "setAnyEthernetPort": true
                            },
                            "rateHz": 5.0,
                            "remotePoint": {
                                "setUid": {
                                    "name": "Common Reference Point"
                                }
                            }
                        }
                    ]
                }
            }
        }
    ]
}, indent=4).encode())

Battery Module

Battery Module Configuration

Example code: battery_module_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.BatteryModuleConfiguration(
        is_battery_charging_enabled=False
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.BatteryModuleConfiguration(
        is_battery_charging_enabled=False
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.BatteryModuleConfiguration(
        is_battery_charging_enabled=False
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.battery_module.BatteryModuleConfiguration",
                "isBatteryChargingEnabled": false
            }
        }
    ]
}, indent=4).encode())

Navigation Module Observation

Example code: navigation_module_observation.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
                print(time.time())
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
                print(time.time())
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="NavigationModuleObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
                print(time.time())
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "NavigationModuleObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Device Module

Device Module Command Shutdown

Example code: device_module_command_shutdown.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandShutdown()))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.modules.device_module.DeviceModuleCommandShutdown"
            }
        }
    ]
}, indent=4).encode())

Device Module Command Factory Reset

Example code: device_module_command_factory_reset.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.DeviceModuleCommandFactoryReset()))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.modules.device_module.DeviceModuleCommandFactoryReset"
            }
        }
    ]
}, indent=4).encode())

Network Module

Network Module Configuration

Example code: network_module_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NetworkModuleConfiguration(
        subnet_mask="255.255.255.0",
        ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
        gateway_ip="192.168.179.1",
        dhcp_enabled=False
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NetworkModuleConfiguration(
        subnet_mask="255.255.255.0",
        ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
        gateway_ip="192.168.179.1",
        dhcp_enabled=False
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NetworkModuleConfiguration(
        subnet_mask="255.255.255.0",
        ip_address_list=son.IPAddressList(ip_addresses=["192.168.179.56"]),
        gateway_ip="192.168.179.1",
        dhcp_enabled=False
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.network_module.NetworkModuleConfiguration",
                "ipAddressList": {
                    "ipAddresses": [
                        "192.168.179.56"
                    ]
                },
                "subnetMask": "255.255.255.0",
                "gatewayIp": "192.168.179.1",
                "dhcpEnabled": false
            }
        }
    ]
}, indent=4).encode())

Services

Observation Service

Example code: observation_service.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation")),
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation")),
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "GnssSourceObservation"
                            }
                        },
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "XposSourceObservation"
                            }
                        },
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "SusblSourceObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Configuration Service

Example code: configuration_service.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  

    print("all configurations:")
    configurations = wrapper.get_configurations("Configuration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    print("all configurations ending in SourceConfiguration:")
    configurations = wrapper.get_configurations("SourceConfiguration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    configuration_stream = wrapper.open_configuration_stream(
        son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
            son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
        ])
    )
    print("\n(listening to configuration stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in configuration_stream.recv():
                print(type(message), message)
                print("(listening to configuration stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  

    print("all configurations:")
    configurations = wrapper.get_configurations("Configuration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    print("all configurations ending in SourceConfiguration:")
    configurations = wrapper.get_configurations("SourceConfiguration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    configuration_stream = wrapper.open_configuration_stream(
        son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
            son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
        ])
    )
    print("\n(listening to configuration stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in configuration_stream.recv():
                print(type(message), message)
                print("(listening to configuration stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  

    print("all configurations:")
    configurations = wrapper.get_configurations("Configuration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    print("all configurations ending in SourceConfiguration:")
    configurations = wrapper.get_configurations("SourceConfiguration")
    for configuration in configurations:
        print(f"    • {type(configuration).__name__}")

    configuration_stream = wrapper.open_configuration_stream(
        son.ConfigurationSubscriptionRequest(configuration_subscriptions=[
            son.ConfigurationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="Configuration"))
        ])
    )
    print("\n(listening to configuration stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in configuration_stream.recv():
                print(type(message), message)
                print("(listening to configuration stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
            "matchingCriteria": {
                "matchTypeNameSuffix": "Configuration"
            }
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 2,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
            "matchingCriteria": {
                "matchTypeNameSuffix": "SourceConfiguration"
            }
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.ConfigurationEnvelope",
            "configurationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.ConfigurationSubscriptionRequest",
                    "configurationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "Configuration"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Command Service

Example code: command_service.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    wrapper.send_command(son.InsAlgorithmCommandReset())
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    wrapper.send_command(son.InsAlgorithmCommandReset())
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    wrapper.send_command(son.InsAlgorithmCommandReset())
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmCommandReset"
            }
        }
    ]
}, indent=4).encode())

Comms Service

Example code: comms_service.py

import sonardyne_api as son
import time
import random

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    comms_stream  = wrapper.open_comms_stream(
        son.CommsSubscriptionRequest(comms_subscriptions=[
            son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
        ])
    )

    # write to stream
    print("\n(writing to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            comms_stream.send(
                son.DataPortComms(
                    uid=son.UniqueID(name="TCP-8110"),
                    data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
                    data=f"Hello World! {random.randrange(100)}".encode()
                )
            )
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    print("\n(listening to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in comms_stream.recv():
                print(type(message), message)
                print("(listening to comms stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time
import random

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    comms_stream  = wrapper.open_comms_stream(
        son.CommsSubscriptionRequest(comms_subscriptions=[
            son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
        ])
    )

    # write to stream
    print("\n(writing to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            comms_stream.send(
                son.DataPortComms(
                    uid=son.UniqueID(name="TCP-8110"),
                    data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
                    data=f"Hello World! {random.randrange(100)}".encode()
                )
            )
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    print("\n(listening to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in comms_stream.recv():
                print(type(message), message)
                print("(listening to comms stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time
import random

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    comms_stream  = wrapper.open_comms_stream(
        son.CommsSubscriptionRequest(comms_subscriptions=[
            son.CommsSubscription(matching_criteria=son.MatchingCriteria(match_uids=[son.UniqueID(name="TCP-8110")]))
        ])
    )

    # write to stream
    print("\n(writing to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            comms_stream.send(
                son.DataPortComms(
                    uid=son.UniqueID(name="TCP-8110"),
                    data_direction=son.DataDirection.DATA_DIRECTION_OUTPUT,
                    data=f"Hello World! {random.randrange(100)}".encode()
                )
            )
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    print("\n(listening to comms stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in comms_stream.recv():
                print(type(message), message)
                print("(listening to comms stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.comms_service.CommsEnvelope",
            "commsMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.comms_service.CommsSubscriptionRequest",
                    "commsSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchUids": [
                                    {
                                        "name": "TCP-8110"
                                    }
                                ]
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Sources

The Source component represents some "aiding" or "input" source e.g. GNSS or Time.

The API can be used to get and set the configuration of a particular source. This could include, for example, the source's lever arms; the lever arms relate the position of the source to the central reference point of the vessel.

Sources can also send and receive SourceObservations using the ObservationService.

GNSS Source

GNSS Source Configuration

Example code: gnss_source_configuration.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.GnssSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.GnssSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.GnssSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.gnss_source.GnssSourceConfiguration",
                "inputDataPort": {
                    "setUid": {
                        "name": "External Control"
                    }
                }
            }
        }
    ]
}, indent=4).encode())

GNSS Source Observation

Example code: gnss_source_observation.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                altitude_metres=0.0,
                geoid_separation_metres=0.0,
                hdop=0.5,
                age_seconds=0.0,
                number_of_satellites=10,
                reference_station_id=None,
                fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                altitude_metres=0.0,
                geoid_separation_metres=0.0,
                hdop=0.5,
                age_seconds=0.0,
                number_of_satellites=10,
                reference_station_id=None,
                fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.GnssSourceObservation(gga_telegram=son.GgaTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                altitude_metres=0.0,
                geoid_separation_metres=0.0,
                hdop=0.5,
                age_seconds=0.0,
                number_of_satellites=10,
                reference_station_id=None,
                fix_quality=son.GgaTelegram.FIX_QUALITY_GNSS_FIX
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="GnssSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.sources.gnss_source.GnssSourceObservation",
                    "ggaTelegram": {
                        "timeOfValidity": {
                            "commonTimeSeconds": 1772458213.2997377
                        },
                        "hdop": 0.5,
                        "numberOfSatellites": 10,
                        "fixQuality": "FIX_QUALITY_GNSS_FIX"
                    }
                }
            ]
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "GnssSourceObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Example code: time_module_configuration.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
            input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
            input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.TimeModuleConfiguration(
        input_time_sources=son.TimeSourceInputReferences(
            input_ntp_time_source=son.NtpTimeSourceReference(set_none=True),
            input_zda_time_source=son.ZdaTimeSourceReference(set_any=True),
            input_pps_time_source=son.PpsTimeSourceReference(set_none=True)
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.modules.time_module.TimeModuleConfiguration",
                "inputTimeSources": {
                    "inputZdaTimeSource": {
                        "setAny": true
                    },
                    "inputPpsTimeSource": {
                        "setNone": true
                    },
                    "inputNtpTimeSource": {
                        "setNone": true
                    }
                }
            }
        }
    ]
}, indent=4).encode())

XPOS Source

XPOS Source Observation

Example code: xpos_source_observation.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.XposSourceObservation(xpos_data=son.XposData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                depth_metres=0.0,
                depth_uncertainty_metres=1.0,
                horizontal_position_uncertainty_metres=0.5,
                source=son.XPOS_SOURCE_GNSS
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.XposSourceObservation(xpos_data=son.XposData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                depth_metres=0.0,
                depth_uncertainty_metres=1.0,
                horizontal_position_uncertainty_metres=0.5,
                source=son.XPOS_SOURCE_GNSS
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.XposSourceObservation(xpos_data=son.XposData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                latitude_radians=0.0,
                longitude_radians=0.0,
                depth_metres=0.0,
                depth_uncertainty_metres=1.0,
                horizontal_position_uncertainty_metres=0.5,
                source=son.XPOS_SOURCE_GNSS
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="XposSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.sources.xpos_source.XposSourceObservation",
                    "xposData": {
                        "timeOfValidity": {
                            "commonTimeSeconds": 1772458213.6010525
                        },
                        "horizontalPositionUncertaintyMetres": 0.5,
                        "depthUncertaintyMetres": 1.0,
                        "source": "XPOS_SOURCE_GNSS"
                    }
                }
            ]
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "XposSourceObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

XPOS Source Configuration

Example code: xpos_source_configuration.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.XposSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.XposSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.XposSourceConfiguration(
        input_data_port=son.DataPortReference(
            set_uid=son.UniqueID(name="External Control")
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.xpos_source.XposSourceConfiguration",
                "inputDataPort": {
                    "setUid": {
                        "name": "External Control"
                    }
                }
            }
        }
    ]
}, indent=4).encode())

SUSBL Source

SUSBL Source Configuration

Example code: susbl_source_configuration.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SusblSourceConfiguration(input_data_port=son.DataPortReference(set_uid=son.UniqueID(name="External Control")))))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.susbl_source.SusblSourceConfiguration",
                "inputDataPort": {
                    "setUid": {
                        "name": "External Control"
                    }
                }
            }
        }
    ]
}, indent=4).encode())

SUSBL Source Observation

Example code: susbl_source_observation.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
                x_coordinate=0,
                y_coordinate=0,
                transponder_code=0,
                is_valid=True,
                orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
                error_code="",
                software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
                depth_metres=0,
                position_uncertainty_metres=1
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
                x_coordinate=0,
                y_coordinate=0,
                transponder_code=0,
                is_valid=True,
                orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
                error_code="",
                software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
                depth_metres=0,
                position_uncertainty_metres=1
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.SusblSourceObservation(psimssb_telegram=son.PsimssbTelegram(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                coordinate_type=son.PsimssbTelegram.PSIMSSB_COORDINATE_SYSTEM_RADIANS,
                x_coordinate=0,
                y_coordinate=0,
                transponder_code=0,
                is_valid=True,
                orientation_type=son.PsimssbTelegram.PSIMSSB_ORIENTATION_NORTH,
                error_code="",
                software_filter=son.PsimssbTelegram.PSIMSSB_SOFTWARE_FILTER_UNKNOWN,
                depth_metres=0,
                position_uncertainty_metres=1
            )))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SusblSourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.sources.susbl_source.SusblSourceObservation",
                    "psimssbTelegram": {
                        "timeOfValidity": {
                            "commonTimeSeconds": 1772458214.0693219
                        },
                        "coordinateType": "PSIMSSB_COORDINATE_SYSTEM_RADIANS",
                        "isValid": true,
                        "orientationType": "PSIMSSB_ORIENTATION_NORTH",
                        "softwareFilter": "PSIMSSB_SOFTWARE_FILTER_UNKNOWN",
                        "positionUncertaintyMetres": 1.0
                    }
                }
            ]
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "SusblSourceObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Sound Velocity Source

Sound Velocity Source Observation

Example code: sound_velocity_source_observation.py

import sonardyne_api as son
import time
import random

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time
import random

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time
import random

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation = son.SoundVelocitySourceObservation(sound_velocity_data=son.SoundVelocityData(
                time_of_validity=son.Timestamp(common_time_seconds=time.time()),
                sound_velocity_meters_per_second=1500.0 + random.randrange(-500,500) * 0.01
            ))
            observation_stream.send(observation)
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    # read from stream
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(observation_subscriptions=[
            son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="SoundVelocitySourceObservation"))
        ])
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceObservation",
                    "soundVelocityData": {
                        "timeOfValidity": {
                            "commonTimeSeconds": 1772458214.2173042
                        },
                        "soundVelocityMetersPerSecond": 1503.64
                    }
                }
            ]
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "SoundVelocitySourceObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

Sound Velocity Source Configuration

Example code: sound_velocity_source_configuration.py

import sonardyne_api as son
import time
import random

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
    )
import sonardyne_api as son
import time
import random

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
    )
import sonardyne_api as son
import time
import random

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_salinity_parts_per_thousand=100)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_manual_sound_velocity_metres_per_second=150)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_ethernet_port=True)))
    )
    print(wrapper.set_configuration(son.SoundVelocitySourceConfiguration(
        input_sound_velocity_data_source_reference=son.SoundVelocityDataSourceReference(set_any_external_control_port=True)))
    )
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
                "inputSoundVelocityDataSourceReference": {
                    "setSalinityPartsPerThousand": 100.0
                }
            }
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 2,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
                "inputSoundVelocityDataSourceReference": {
                    "setManualSoundVelocityMetresPerSecond": 150.0
                }
            }
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 3,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
                "inputSoundVelocityDataSourceReference": {
                    "setAnyEthernetPort": true
                }
            }
        }
    ]
}, indent=4).encode())

socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 4,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.sound_velocity_source.SoundVelocitySourceConfiguration",
                "inputSoundVelocityDataSourceReference": {
                    "setAnyExternalControlPort": true
                }
            }
        }
    ]
}, indent=4).encode())

Time Source

PPS Time Source Configuration

Example code: pps_time_source_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PpsTimeSourceConfiguration(input_trigger_port=son.TriggerPortReference(set_any=True))))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.time_source.PpsTimeSourceConfiguration",
                "inputTriggerPort": {
                    "setAny": true
                }
            }
        }
    ]
}, indent=4).encode())

NTP Time Source Configuration

Example code: ntp_time_source_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.NtpTimeSourceConfiguration(ntp_server_ip_address="230.229.228.227")))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.time_source.NtpTimeSourceConfiguration",
                "ntpServerIpAddress": "230.229.228.227"
            }
        }
    ]
}, indent=4).encode())

ZDA Time Source Observation

Example code: zda_time_source_observation.py

import sonardyne_api as son
import time

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
            time.sleep(1)
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
            time.sleep(1)
    except KeyboardInterrupt:
        pass
import sonardyne_api as son
import time

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    # write to stream
    observation_stream = wrapper.open_observation_stream()
    print("\n(writing to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            observation_stream.send(son.ZdaTimeSourceObservation(zda_data=son.ZdaData(current_utc_time_seconds = time.time())))
            time.sleep(1)
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.sources.time_source.ZdaTimeSourceObservation",
                    "zdaData": {
                        "currentUtcTimeSeconds": 1772458214.925132
                    }
                }
            ]
        }
    ]
}, indent=4).encode())

ZDA Time Source Configuration

Example code: zda_time_source_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.ZdaTimeSourceConfiguration(input_data_port=son.DataPortReference(set_any_ethernet_port=True))))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.time_source.ZdaTimeSourceConfiguration",
                "inputDataPort": {
                    "setAnyEthernetPort": true
                }
            }
        }
    ]
}, indent=4).encode())

Depth Source

Depth Source Configuration

Example code: depth_source_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DepthSourceConfiguration(
        input_depth_data_source_reference=son.DepthDataSourceReference(
            set_uid=son.UniqueID(name="TCP-8110")
        ),
        lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DepthSourceConfiguration(
        input_depth_data_source_reference=son.DepthDataSourceReference(
            set_uid=son.UniqueID(name="TCP-8110")
        ),
        lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DepthSourceConfiguration(
        input_depth_data_source_reference=son.DepthDataSourceReference(
            set_uid=son.UniqueID(name="TCP-8110")
        ),
        lever_arms=son.LeverArms(forward_metres=1.5, starboard_metres=2, down_metres=3.2)
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.sources.depth_source.DepthSourceConfiguration",
                "inputDepthDataSourceReference": {
                    "setUid": {
                        "name": "TCP-8110"
                    }
                },
                "leverArms": {
                    "forwardMetres": 1.5,
                    "starboardMetres": 2.0,
                    "downMetres": 3.2
                }
            }
        }
    ]
}, indent=4).encode())

Ports

The Port List component represents some collection of data inputs/outputs.

Some port lists, e.g. ethernet port lists, can have entries added and removed. Other port lists, e.g. serial port lists, represent physical ports on the device, so are fixed-length.

On a data port such as a SerialPort or an EthernetPort, it is possible to send and receive PortComms using the CommsService.

Serial Port

Serial Port List Configuration

Example code: serial_port_list_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
    ])))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
    ])))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.SerialPortListConfiguration(serial_ports=[
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_9600),
        son.SerialPort(baud_rate=son.SerialPort.BAUD_RATE_38400),
    ])))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.port_lists.serial_port_list.SerialPortListConfiguration",
                "serialPorts": [
                    {
                        "baudRate": "BAUD_RATE_9600"
                    },
                    {
                        "baudRate": "BAUD_RATE_38400"
                    }
                ]
            }
        }
    ]
}, indent=4).encode())

Ethernet Port

Ethernet Port List Configuration

Example code: ethernet_port_list_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
    print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
        son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
        son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
        son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
    ])))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
    print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
        son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
        son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
        son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
    ])))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as device: # SPRINT-Nav ip address & port
    print(device.set_configuration(son.EthernetPortListConfiguration(ethernet_ports=[
        son.EthernetPort(source_port=8110, tcp_server=son.TcpServerParameters()),
        son.EthernetPort(source_port=8111, tcp_client=son.TcpClientParameters(server_ip_address="192.168.179.60")),
        son.EthernetPort(source_port=8112, udp_socket=son.UdpSocketParameters(destination_ip_address="192.168.179.70")),
    ])))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.port_lists.ethernet_port_list.EthernetPortListConfiguration",
                "ethernetPorts": [
                    {
                        "sourcePort": 8110,
                        "tcpServer": {}
                    },
                    {
                        "sourcePort": 8111,
                        "tcpClient": {
                            "serverIpAddress": "192.168.179.60"
                        }
                    },
                    {
                        "sourcePort": 8112,
                        "udpSocket": {
                            "destinationIpAddress": "192.168.179.70"
                        }
                    }
                ]
            }
        }
    ]
}, indent=4).encode())

Trigger Port

Trigger Port List Configuration

Example code: trigger_port_list_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.TriggerPortListConfiguration))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
            "matchingCriteria": {
                "matchTypeNameSuffix": "TriggerPortListConfiguration"
            }
        }
    ]
}, indent=4).encode())

External Control Port

External Control Port List Configuration

Example code: external_control_port_list_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.get_configuration(son.ExternalControlPortListConfiguration))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.GetConfigurationRequest",
            "matchingCriteria": {
                "matchTypeNameSuffix": "ExternalControlPortListConfiguration"
            }
        }
    ]
}, indent=4).encode())

Power Pass Port

Power Pass Port List Configuration

Example code: power_pass_port_list_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
    ])))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
    ])))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.PowerPassPortListConfiguration(power_pass_ports=[
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_TRIPPED),
        son.PowerPassPort(power_pass_state=son.PowerPassPort.POWER_PASS_STATE_ENABLED)
    ])))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.port_lists.power_pass_port_list.PowerPassPortListConfiguration",
                "powerPassPorts": [
                    {
                        "powerPassState": "POWER_PASS_STATE_TRIPPED"
                    },
                    {
                        "powerPassState": "POWER_PASS_STATE_ENABLED"
                    }
                ]
            }
        }
    ]
}, indent=4).encode())

Algorithms

The Algorithm component represents some internal algorithm e.g. an INS algorithm.

INS Algorithm

INS Algorithm Command Reset

Example code: ins_algorithm_command_reset.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.InsAlgorithmCommandReset()))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmCommandReset"
            }
        }
    ]
}, indent=4).encode())

INS Algorithm Configuration

Example code: ins_algorithm_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
        is_gnss_enabled=True,
        is_xpos_enabled=False
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
        is_gnss_enabled=True,
        is_xpos_enabled=False
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.InsAlgorithmConfiguration(
        is_gnss_enabled=True,
        is_xpos_enabled=False
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.ins_algorithm.InsAlgorithmConfiguration",
                "isGnssEnabled": true,
                "isXposEnabled": false
            }
        }
    ]
}, indent=4).encode())

INS Algorithm Observation

Example code: ins_algorithm_observation.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    observation_stream = wrapper.open_observation_stream(
        son.ObservationSubscriptionRequest(
            observation_subscriptions=[
                son.ObservationSubscription(matching_criteria=son.MatchingCriteria(match_type_name_suffix="AidingObservation"))
            ]
        )
    )
    print("\n(listening to observation stream) press ctrl-c to stop stream")
    try:
        while True:
            for message in observation_stream.recv():
                print(type(message), message)
                print("(listening to observation stream) press ctrl-c to stop stream")
    except KeyboardInterrupt:
        pass
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationEnvelope",
            "observationMessages": [
                {
                    "@type": "type.googleapis.com/sonardyne.api.services.observation_service.ObservationSubscriptionRequest",
                    "observationSubscriptions": [
                        {
                            "matchingCriteria": {
                                "matchTypeNameSuffix": "AidingObservation"
                            }
                        }
                    ]
                }
            ]
        }
    ]
}, indent=4).encode())

while True:
    if data := socket.recv(10240):
        print(data.decode())

DVL Algorithm

DVL Algorithm Configuration

Example code: dvl_algorithm_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
        dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
        dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
        adcp_parameters=son.DvlAdcpParameters(
            number_of_cells=son.BoundedUInt32(value=100),
            preferred_cell_width_metres=son.BoundedDouble(value=0.1),
            pings_to_average=son.BoundedUInt32(value=10),
            dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
            velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
        ),
        input_trigger_parameters=son.DvlInputTriggerParameters(
            input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
            input_trigger_edge=son.TRIGGER_EDGE_RISING,
            input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
        )
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
        dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
        dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
        adcp_parameters=son.DvlAdcpParameters(
            number_of_cells=son.BoundedUInt32(value=100),
            preferred_cell_width_metres=son.BoundedDouble(value=0.1),
            pings_to_average=son.BoundedUInt32(value=10),
            dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
            velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
        ),
        input_trigger_parameters=son.DvlInputTriggerParameters(
            input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
            input_trigger_edge=son.TRIGGER_EDGE_RISING,
            input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
        )
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.DvlAlgorithmConfiguration(
        dvl_rate=son.DvlRate(value=son.DvlRate.DvlRateEnum.DVL_RATE_ENUM_FIXED_5HZ),
        dvl_mode=son.DvlMode(value=son.DvlMode.DVL_MODE_ENUM_DVL),
        adcp_parameters=son.DvlAdcpParameters(
            number_of_cells=son.BoundedUInt32(value=100),
            preferred_cell_width_metres=son.BoundedDouble(value=0.1),
            pings_to_average=son.BoundedUInt32(value=10),
            dvl_adcp_ping_ratio=son.BoundedUInt32(value=10),
            velocity_limit=son.AdcpVelocityLimit(value=son.AdcpVelocityLimit.ADCP_VELOCITY_LIMIT_ENUM_STANDARD)
        ),
        input_trigger_parameters=son.DvlInputTriggerParameters(
            input_trigger_port=son.TriggerPortReference(set_uid=son.UniqueID(name="Trigger A1")),
            input_trigger_edge=son.TRIGGER_EDGE_RISING,
            input_trigger_delay_seconds=son.BoundedDouble(value=0.1)
        )
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.dvl_algorithm.DvlAlgorithmConfiguration",
                "dvlRate": {
                    "value": "DVL_RATE_ENUM_FIXED_5HZ"
                },
                "dvlMode": {
                    "value": "DVL_MODE_ENUM_DVL"
                },
                "inputTriggerParameters": {
                    "inputTriggerPort": {
                        "setUid": {
                            "name": "Trigger A1"
                        }
                    },
                    "inputTriggerEdge": "TRIGGER_EDGE_RISING",
                    "inputTriggerDelaySeconds": {
                        "value": 0.1
                    }
                },
                "adcpParameters": {
                    "numberOfCells": {
                        "value": 100
                    },
                    "preferredCellWidthMetres": {
                        "value": 0.1
                    },
                    "pingsToAverage": {
                        "value": 10
                    },
                    "dvlAdcpPingRatio": {
                        "value": 10
                    },
                    "velocityLimit": {
                        "value": "ADCP_VELOCITY_LIMIT_ENUM_STANDARD"
                    }
                }
            }
        }
    ]
}, indent=4).encode())

AHRS Algorithm

AHRS Algorithm Configuration

Example code: ahrs_algorithm_configuration.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
        coarse_position_latitude_radians=0.1,
        coarse_position_longitude_radians=0.2
    )))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
        coarse_position_latitude_radians=0.1,
        coarse_position_longitude_radians=0.2
    )))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.set_configuration(son.AhrsAlgorithmConfiguration(
        coarse_position_latitude_radians=0.1,
        coarse_position_longitude_radians=0.2
    )))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.configuration_service.SetConfigurationRequest",
            "configuration": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.ahrs_algorithm.AhrsAlgorithmConfiguration",
                "coarsePositionLatitudeRadians": 0.1,
                "coarsePositionLongitudeRadians": 0.2
            }
        }
    ]
}, indent=4).encode())

AHRS Algorithm Command Reset

Example code: ahrs_algorithm_command_reset.py

import sonardyne_api as son

with son.WrapperGrpc('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import sonardyne_api as son

with son.WrapperFramedTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import sonardyne_api as son

with son.WrapperJsonTcp('0.0.0.0', 8103) as wrapper:  
    print(wrapper.send_command(son.AhrsAlgorithmCommandReset()))
import json, socket

socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.connect(('0.0.0.0', 8103))
socket.sendall(json.dumps(
{
    "@type": "type.googleapis.com/sonardyne.api.common.envelope.RequestEnvelope",
    "uid": 1,
    "requests": [
        {
            "@type": "type.googleapis.com/sonardyne.api.services.command_service.SendCommandRequest",
            "command": {
                "@type": "type.googleapis.com/sonardyne.api.algorithms.ahrs_algorithm.AhrsAlgorithmCommandReset"
            }
        }
    ]
}, indent=4).encode())