Code examples

Note

Examples assume that default contents of boot.py has been executed.

Virtual depth sounder using real data

This program illustrates the approach to solving the problem of non-calibrated or partially broken instruments. Many old instruments, such as echo sounders, require an accompanying instrument display to calibrate them. When this is broken, it is difficult to find a replacement. The solution is to create a virtual echo sounder that uses the data from the real sensor, but with a user-defined sensor offset. Modern multi-function displays allow the selection of the data source, so the user only needs to select the virtual echo sounder in the settings instead of the real depth sounder.

The following screenshots are taken from the Raymarine Axiom display.

_images/rm_depth_1.png _images/rm_depth_2.png
OFFSET = int(1.0 * 1000) # transduser offset in meters * 1000

depth_data = bytearray(8) # Water Depth PGN data storage

def rx_depth(n2k, data, msgid, seq=-1):
    global depth_data
    # copy entire message
    depth_data[:] = data
    # replace 'offset' field
    depth_data[5] = OFFSET & 0xFF
    depth_data[6] = (OFFSET >> 8) & 0xFF
    # send modified message
    n2k.sendpgn(depth_data, 128267)

# Change Gateway's device type to 'Bottom Depth'
n2k.address_claim(devfunc=130, devclass=60)

# Change Gateway's model version to 'Depth Transducer'
n2k.product_info(modelver='Depth Transducer')

# Register callback on depth data reception
# Data source address can be specified by adding `src=<address>`
n2k.rxcallback(0, rx_depth, 128267)

Reception of different types of NMEA 2000 messages

This program illustrates the cascading and priority of receive callback. The first callback (with index 0) uses mask to catch all non-addressable messages. The callback returns False, which means that processed messages are not checked for matches with other callbacks. The next callback receives all messages, but thanks to the first callback it will only process addressable messages. It returns None, so all processed messages are also checked against callbacks with a higher index. And the last callback, with index 2, is only set for PGN 126720.

led.state(led.RED) # set default color of 6th flash

def rx_global(n2k, data, msgid, seq=-1):
    pgn, dst, prio, src = parseid(msgid)
    # dst always equal BROADCAST here, because global PGNs don't have destination address
    print(f'Global PGN {pgn} sent by {src} with priority {prio}.')
    return False # don't pass messages to next callback

def rx_addressable(n2k, data, msgid, seq=-1):
    pgn, dst, prio, src = parseid(msgid)
    print(f'PGN {pgn} sent by {src} to {dst} with priority {prio}.')
    return None # pass messages to next callback (line can be omitted)

def rx_prop_fast_addr(n2k, data, msgid, seq): # no need to have default value for 'seq'
    print("Proprietary Addressable Fast-Packet")
    led.green() # signal reception of PGN 126720

n2k.rxcallback(0, rx_global, mask=MASK_GLOBAL) # receive all non-addressable PGNs
n2k.rxcallback(1, rx_addressable) # global PGNs already blocked, so we receive only addressable
n2k.rxcallback(2, rx_prop_fast_addr, 126720) # receive proprietary addressable fast-packet PGN

Custom NMEA 2000 configuration commands

Two installation description fields on NMEA 2000 devices are used by installers to leave contact information or information about the location of the device. They can also be used to send commands to the unit. The program below allows you to control the color of the Gateway’s LED. The NMEA 2000 gateway and software are required to send commands. If you have a Yacht Devices gateway, use the free CAN Log Viewer software. If you have two Python gateways, you can turn one into an NMEA 2000 gateway with the program below.

led.manual(True) # constant-color mode

# Accepts commands in format 'CMD:LED RED'
def cfg_clbk(n2k, install1, install2):
    if install1 != None and install1.startswith('CMD:'):
        args = install1[4:].split(' ') # exclude 'CMD:' and split into arguments

        if args[0] == "LED":
            success = len(args) == 2 # command accepts only one argument
            if success:
                if args[1] == 'OFF':
                    led.off()
                elif args[1] == 'RED':
                    led.red()
                elif args[1] == 'GREEN':
                    led.green()
                else:
                    success = False

            if success:
                n2k.config_info(install1 = install1 + ' DONE')
            else:
                n2k.config_info(install1 = 'CMD:LED') # erase wrong arguments

n2k.cfgcallback(cfg_clbk)

LED control based on digital switching

This program turns the Python Gateway into the NMEA 2000 digital switching unit that controls the Gateway’s LED. Unfortunately, although the NMEA 2000 Standard introduced digital switching messages many years ago, most of the big manufacturers preferred to support proprietary protocols (CZone, EmpirBus) and most likely you won’t be able to manage LED from your MFD. If you have one of our NMEA 2000 gateways, you can use CAN Log Viewer to play with LEDs or Web Gauges.

BANK = 7 # bank instance to select in Web Gauges

bin_status = bytearray((BANK, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)) # PGN data
led.manual(True) # constant-color LED mode

def ds_send_update():
    # set active channel based on current LED color
    if led.state() == led.RED:
        bin_status[1] = 0xF0 | 0x1
    elif led.state() == led.GREEN:
        bin_status[1] = 0xF0 | 0x4
    else:
        bin_status[1] = 0xF0
    n2k.sendpgn(bin_status, 127501) # send 'Switch Bank Status' PGN

def ds_clbk(n2k, data, msgid, seq=-1):
    if data[0] == BANK:
        if data[1] & 0x3 == 0x1:
            led.red()
        elif data[1] & 0xC == 0x4:
            led.green()
        else:
            led.off()
        ds_send_update()

n2k.rxcallback(0, ds_clbk, 127502) # receive 'Switch Bank Control' PGN

while True:
    ds_send_update() # periodically send 'Switch Bank Status' PGN.
    time.sleep(3)

AIS converter from NMEA 0183 to NMEA 2000

This converter is limited to messages 1, 2, 3 (AIS Class A Position Report) and 14 (AIS Safety-Related Broadcast Message), while the full AIS decoder supports 12 types of AIS messages. Decoding AIS messages is the most complex task in NMEA gateways because it requires unpacking the binary message from the NMEA 0183 envelope and converting units to those used in NMEA 2000. We have included this example to illustrate the power of the Python language and the performance of the Gateway: message conversion takes about 1.5 milliseconds. This is more than enough to process AIS data in real time.

import struct
from math import pi

PACK_FMT_POS_REPORT = b'<BIiiBHHHBHHBBB'
PACK_FMT_SAFETY_MSG = b'<BIBBB' # not including text itself

PACK_FMT_POS_REPORT_SIZE = struct.calcsize(PACK_FMT_POS_REPORT)
pos_report_out = bytearray(PACK_FMT_POS_REPORT_SIZE)

led.state(led.RED) # set default color of 6th flash

class AIS: # NMEA0183 -> NMEA2000 converter

    # Reads unsigned integer with specified bit size
    def uint(self, bit_size):
        ret = bits_uint(self.data, self.bit_offset, bit_size)
        self.bit_offset += bit_size
        return ret

    # Reads signed integer with specified bit size (MSBit is sign)
    def int(self, bit_size):
        ret = bits_int(self.data, self.bit_offset, bit_size)
        self.bit_offset += bit_size
        return ret

    # Reads longitude and converts it to NMEA2000 format
    def longitude(self): # 28 bits
        uint = self.int(28)
        if uint == 0x6791AC0: # invalid value
            return 0x7fffffff
        #uint = ~((~uint) & 0x4FFFFFF) if (uint & 0x8000000) else uint # fix sign
        return (uint * 50) // 3

    # Reads latitude and converts it to NMEA2000 format
    def latitude(self): # 27 bits
        uint = self.int(27)
        if uint == 0x3412140: # invalid value
            return 0x7fffffff
        #uint = ~((~uint) & 0x3FFFFFF) if (uint & 0x4000000) else uint # fix sign
        return (uint * 50) // 3

    # Reads COG and converts it to NMEA2000 format
    def cog(self): # 12 bits
        uint = self.uint(12)
        if uint >= 3600: # invalid value
            return 0xffff
        return uint * 1047215 // 60001

    # Reads SOG and converts it to NMEA2000 format
    def sog(self): # 10 bits
        uint = self.uint(10)
        if uint >= 1023: # invalid value
            return 0xffff
        uint *= 1852
        return uint // 360 + ((uint // 36) >= 5)

    # Reads heading and converts it to NMEA2000 format
    def hdg(self): # 9 bits
        uint = self.uint(9)
        if uint >= 360: # invalid value
            return 0xffff
        return uint * 10472152 // 60001

    # Reads ROT and converts it to NMEA2000 format
    def rot(self): # 8 bits
        uint = self.uint(8)
        if uint == 0x80: # invalid value
            return 0x7fff
        if uint == 0: # skip calculations if 0
            return 0
        sign = (uint & 0x80) != 0
        uint = (~uint) & 0x7F if sign else uint
        rot = uint / 4.733
        rot = (rot * rot) * pi / (180 * 60) * 32000
        if rot > 32000:
            rot = 32000
        return int(-rot if sign else rot)

    # Reads text string
    def str(self, str_len = 0): # (6 * str_len) bits
        if str_len <= 0:
            str_len = (self.bit_size - self.bit_offset) // 6 + str_len
        ba = bytearray(str_len)
        for i in range(str_len):
            uint = self.uint(6)
            ba[i] = uint if (uint >= 32) else (uint + 64)
        return ba.rstrip(b'@')

    def __init__(self):
        self.data = bytearray(100)

    # Initializes decoder with specified binary data
    def init(self, data, fill_bits = 0):
        decode6bit(data, self.data) # ASCII -> Binary
        self.bit_size = len(self.data) * 8 - fill_bits
        self.bit_offset = 0

        self.msg_id = self.uint(6)
        self.repeat_ind = self.uint(2)
        self.mmsi = self.uint(30)

        return (self.msg_id, self.repeat_ind, self.mmsi)

ais = AIS() # AIS decoder

# Converts received NMEA0183 AIS message and sends it to NMEA2000
def ais_process(tx, count, num, seq, ch_b, data, fill_bits):
    global ais, pos_report_out
    global PACK_FMT_POS_REPORT, PACK_FMT_SAFETY_MSG

    msg_id, repeat_ind, mmsi = ais.init(data, fill_bits)

    msg_repeat_ind = msg_id | (repeat_ind << 6)
    tx_info = (tx << 1) | ch_b;

    if msg_id == 1 or msg_id == 2 or msg_id == 3: # Class A Position Report
        nav_status = ais.uint(4)
        rot = ais.rot()
        sog = ais.sog()
        pos_acc = ais.uint(1)
        longitude = ais.longitude()
        latitude = ais.latitude()
        cog = ais.cog()
        hdg = ais.hdg()
        timestamp = ais.uint(6)
        maneuver_ind = ais.uint(2)
        ais_spare = ais.uint(3)
        raim_flag = ais.uint(1)
        comm_state = ais.uint(19)

        struct.pack_into(PACK_FMT_POS_REPORT, pos_report_out, 0,
            msg_repeat_ind, mmsi, longitude, latitude,
            pos_acc | (raim_flag << 1) | (timestamp << 2), cog, sog,
            comm_state & 0xFFFF, (comm_state >> 16) | (tx_info << 3), hdg, rot,
            nav_status | (maneuver_ind << 4) | 0xC0, ais_spare, 0xFF)

        # send PGN "AIS Class A Position Report"
        _ = n2k.sendpgn(pos_report_out, 129038, prio=4)
        led.green() # indicate successful AIS reception
    elif msg_id == 14: # Safety-Related Broadcast Message
        ais_spare = ais.uint(2)
        text = ais.str() # read until end of the message

        safety_out = struct.pack(PACK_FMT_SAFETY_MSG,
             msg_repeat_ind, mmsi,
             0x01 | (tx_info << 1) | (ais_spare << 6),
             len(text) + 2, 0x1)

        # add string separatly, because pack() does not support variable-length strings
        safety_out += text
        safety_out += b'\xFF' # sequence number

        # send PGN "AIS Safety-Related Broadcast Message"
        _ = n2k.sendpgn(safety_out, 129802, prio=5)
        led.green() # indicate successful AIS reception

def rx_ais(n0183, line):
    args = line.split(b'*')[0].split(b',') # remove checksum, split arguments
    if len(args) == 7:
        args[0] = args[0][-1] == ord(b'O') # is TX message?
        args[1] = int(args[1]) # number of sentences in message
        args[2] = int(args[2]) # index of sentence in sequence
        args[4] = args[4] == b'B' # is channel B used?
        args[6] = int(args[6]) # number of fill bits
        if args[1] == 1 and args[2] == 1: # process only single-sentence messages
            ais_process(*args)
    return False

n0183.rxcallback(0, rx_ais, '!AIVDM') # received AIS
n0183.rxcallback(1, rx_ais, '!AIVDO') # transmitted AIS

NMEA 2000 USB Gateway, RAW protocol

There is no need to purchase our USB Gateway or NMEA 2000 Wi-Fi Gateway just to use it with the CAN Log Viewer. This program is a bi-directional converter from NMEA 2000 protocol to RAW format, which is also used in the Expedition 10 navigation software.

import sys

def tx_clbk(can, data, msgid, state, unique):
    try:
        if state == can.TX_DONE: # frame sent successfuly
            print(str(toraw(data, msgid, True), 'utf-8'), end='')
    except:
        pass # don't print possible errors, they can break PC application

def rx_clbk(n2k, data, msgid, seq=-1):
    try:
        print(str(toraw(data, msgid, False), 'utf-8'), end='')
    except:
        pass # don't print possible errors, they can break PC application

def state_clbk(state):
    if state == ydpg.USB_COM_PORT:
        can.txcallback(tx_clbk)
        n2k.rxcallback(0, rx_clbk)
        led.state(led.GREEN) # connected to PC application, set 6th flash green
    else:
        n2k.rxcallback(0,None) # removing callbacks to prevent writing to COM port
        can.txcallback(None)
        ydpg.usbclear() # erase old messages
        led.state(led.RED) # disconnected from PC application, set 6th flash red

n2k.assemble(False) # call rx_clbk for separate frames only
n2k.loopback(True) # enables processing of sent messages (required for ISO Requests)

state_clbk(ydpg.usbstate()) # call USB callback one time to set callbacks and LED color
ydpg.usbcallback(state_clbk) # register callback on USB state change

# main loop to process messages sent by PC application
while True:
    try:
        # receives line from PC application and splits it into arguments
        args = sys.stdin.readline().rstrip().split(' ')
        # remove possible 'start of text' character
        # (sent when PC application is connecting to YDPG)
        args[0] = args[0].lstrip('\x02')
        # parses HEX message ID and replaces source address
        msgid = (int(args[0], 16) & 0x1FFFFF00) | n2k.address()
        # turns HEX data into byte buffer
        data = bytes([int(i, 16) for i in args[1:]])
        # sends frame to NMEA 2000 bus
        n2k.send(data, msgid, type = n2k.SINGLE)
    except KeyboardInterrupt:
        break # exit program by pressing Ctrl+C
    except:
        pass # don't print possible errors, they can break PC application

# returns YDPG-01 to default state
ydpg.usbcallback(None)
n2k.rxcallback(0, None)
can.txcallback(None)
n2k.loopback(False)
n2k.assemble(True)
led.state(led.OFF)
del state_clbk
del tx_clbk
del rx_clbk

BRP Rotax engine gateway, 11-bit CAN frames

This program demonstrates the handling of 11-bit CAN frames using the BRP Rotax (Bombardier Recreational Products) engine protocol as an example. It supports two engines and converts engine RPM and boost pressure to NMEA 2000. If you need a converter that supports more data types, see our Engine Gateway.

import struct
import time

led.state(led.RED) # set default color of 6th flash

engine_updated = [False, False]
engine_kPa = [0, 0] # boost pressure in NMEA2000 format
engine_rpm = [0, 0] # engine speed in NMEA2000 format

def rx_rotax(can, fifo):
    global engine_updated, engine_kPa, engine_rpm
    msgid, std, rtr, fmi, data = can.recv(1) # get received CAN frame

    engine_idx = int((msgid & 0x0F0) == 0x000)

    if msgid == 0x300 or msgid == 0x3A0: # boost pressure received
        engine_kPa[engine_idx] = data[5] * 10
    elif msgid == 0x102 or msgid == 0x1A2: # engine speed received
        engine_rpm[engine_idx] = (data[1] | (data[2] << 8)) // 2

    engine_updated[engine_idx] = True # set 'received' flag

can.setfilter(1, can.LIST16, 1, (0x102, 0x1A2, 0x300, 0x3A0)) # allow specified id
can.rxcallback(1, rx_rotax) # register callback on reception of CAN frame

engine_data = bytearray(8) # frame data for PGN "Engine Parameters, Rapid Update"

while True:
    time.sleep(0.1) # PGN transmitted every 100 ms for each engine
    for idx in range(2):
        if engine_updated[idx]: # is engine data received?
            engine_updated[idx] = False # clear 'received' flag
            struct.pack_into(b'<BHHbH', engine_data, 0,
                idx, # engine instance
                engine_rpm[idx], # engine speed
                engine_kPa[idx], # engine boost pressure
                0x7F, # engine tilt/trim
                0xFFFF) # reserved
            _ = n2k.sendpgn(engine_data, 127488) # send PGN "Engine Parameters, Rapid Update"
            led.green() # indicate successful conversion