Code examples
Note
Examples assume that default contents of boot.py
has been executed.
Virtual depth sounder using real data
This program illustrates the approach to solving the problem of non-calibrated or partially broken instruments. Many old instruments, such as echo sounders, require an accompanying instrument display to calibrate them. When this is broken, it is difficult to find a replacement. The solution is to create a virtual echo sounder that uses the data from the real sensor, but with a user-defined sensor offset. Modern multi-function displays allow the selection of the data source, so the user only needs to select the virtual echo sounder in the settings instead of the real depth sounder.
The following screenshots are taken from the Raymarine Axiom display.
OFFSET = int(1.0 * 1000) # transduser offset in meters * 1000
depth_data = bytearray(8) # Water Depth PGN data storage
def rx_depth(n2k, data, msgid, seq=-1):
global depth_data
# copy entire message
depth_data[:] = data
# replace 'offset' field
depth_data[5] = OFFSET & 0xFF
depth_data[6] = (OFFSET >> 8) & 0xFF
# send modified message
n2k.sendpgn(depth_data, 128267)
# Change Gateway's device type to 'Bottom Depth'
n2k.address_claim(devfunc=130, devclass=60)
# Change Gateway's model version to 'Depth Transducer'
n2k.product_info(modelver='Depth Transducer')
# Register callback on depth data reception
# Data source address can be specified by adding `src=<address>`
n2k.rxcallback(0, rx_depth, 128267)
Reception of different types of NMEA 2000 messages
This program illustrates the cascading and priority of receive callback. The first callback (with index 0) uses mask to catch all non-addressable messages. The callback returns False, which means that processed messages are not checked for matches with other callbacks. The next callback receives all messages, but thanks to the first callback it will only process addressable messages. It returns None, so all processed messages are also checked against callbacks with a higher index. And the last callback, with index 2, is only set for PGN 126720.
led.state(led.RED) # set default color of 6th flash
def rx_global(n2k, data, msgid, seq=-1):
pgn, dst, prio, src = parseid(msgid)
# dst always equal BROADCAST here, because global PGNs don't have destination address
print(f'Global PGN {pgn} sent by {src} with priority {prio}.')
return False # don't pass messages to next callback
def rx_addressable(n2k, data, msgid, seq=-1):
pgn, dst, prio, src = parseid(msgid)
print(f'PGN {pgn} sent by {src} to {dst} with priority {prio}.')
return None # pass messages to next callback (line can be omitted)
def rx_prop_fast_addr(n2k, data, msgid, seq): # no need to have default value for 'seq'
print("Proprietary Addressable Fast-Packet")
led.green() # signal reception of PGN 126720
n2k.rxcallback(0, rx_global, mask=MASK_GLOBAL) # receive all non-addressable PGNs
n2k.rxcallback(1, rx_addressable) # global PGNs already blocked, so we receive only addressable
n2k.rxcallback(2, rx_prop_fast_addr, 126720) # receive proprietary addressable fast-packet PGN
Custom NMEA 2000 configuration commands
Two installation description fields on NMEA 2000 devices are used by installers to leave contact information or information about the location of the device. They can also be used to send commands to the unit. The program below allows you to control the color of the Gateway’s LED. The NMEA 2000 gateway and software are required to send commands. If you have a Yacht Devices gateway, use the free CAN Log Viewer software. If you have two Python gateways, you can turn one into an NMEA 2000 gateway with the program below.
led.manual(True) # constant-color mode
# Accepts commands in format 'CMD:LED RED'
def cfg_clbk(n2k, install1, install2):
if install1 != None and install1.startswith('CMD:'):
args = install1[4:].split(' ') # exclude 'CMD:' and split into arguments
if args[0] == "LED":
success = len(args) == 2 # command accepts only one argument
if success:
if args[1] == 'OFF':
led.off()
elif args[1] == 'RED':
led.red()
elif args[1] == 'GREEN':
led.green()
else:
success = False
if success:
n2k.config_info(install1 = install1 + ' DONE')
else:
n2k.config_info(install1 = 'CMD:LED') # erase wrong arguments
n2k.cfgcallback(cfg_clbk)
LED control based on digital switching
This program turns the Python Gateway into the NMEA 2000 digital switching unit that controls the Gateway’s LED. Unfortunately, although the NMEA 2000 Standard introduced digital switching messages many years ago, most of the big manufacturers preferred to support proprietary protocols (CZone, EmpirBus) and most likely you won’t be able to manage LED from your MFD. If you have one of our NMEA 2000 gateways, you can use CAN Log Viewer to play with LEDs or Web Gauges.
BANK = 7 # bank instance to select in Web Gauges
bin_status = bytearray((BANK, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)) # PGN data
led.manual(True) # constant-color LED mode
def ds_send_update():
# set active channel based on current LED color
if led.state() == led.RED:
bin_status[1] = 0xF0 | 0x1
elif led.state() == led.GREEN:
bin_status[1] = 0xF0 | 0x4
else:
bin_status[1] = 0xF0
n2k.sendpgn(bin_status, 127501) # send 'Switch Bank Status' PGN
def ds_clbk(n2k, data, msgid, seq=-1):
if data[0] == BANK:
if data[1] & 0x3 == 0x1:
led.red()
elif data[1] & 0xC == 0x4:
led.green()
else:
led.off()
ds_send_update()
n2k.rxcallback(0, ds_clbk, 127502) # receive 'Switch Bank Control' PGN
while True:
ds_send_update() # periodically send 'Switch Bank Status' PGN.
time.sleep(3)
AIS converter from NMEA 0183 to NMEA 2000
This converter is limited to messages 1, 2, 3 (AIS Class A Position Report) and 14 (AIS Safety-Related Broadcast Message), while the full AIS decoder supports 12 types of AIS messages. Decoding AIS messages is the most complex task in NMEA gateways because it requires unpacking the binary message from the NMEA 0183 envelope and converting units to those used in NMEA 2000. We have included this example to illustrate the power of the Python language and the performance of the Gateway: message conversion takes about 1.5 milliseconds. This is more than enough to process AIS data in real time.
import struct
from math import pi
PACK_FMT_POS_REPORT = b'<BIiiBHHHBHHBBB'
PACK_FMT_SAFETY_MSG = b'<BIBBB' # not including text itself
PACK_FMT_POS_REPORT_SIZE = struct.calcsize(PACK_FMT_POS_REPORT)
pos_report_out = bytearray(PACK_FMT_POS_REPORT_SIZE)
led.state(led.RED) # set default color of 6th flash
class AIS: # NMEA0183 -> NMEA2000 converter
# Reads unsigned integer with specified bit size
def uint(self, bit_size):
ret = bits_uint(self.data, self.bit_offset, bit_size)
self.bit_offset += bit_size
return ret
# Reads signed integer with specified bit size (MSBit is sign)
def int(self, bit_size):
ret = bits_int(self.data, self.bit_offset, bit_size)
self.bit_offset += bit_size
return ret
# Reads longitude and converts it to NMEA2000 format
def longitude(self): # 28 bits
uint = self.int(28)
if uint == 0x6791AC0: # invalid value
return 0x7fffffff
#uint = ~((~uint) & 0x4FFFFFF) if (uint & 0x8000000) else uint # fix sign
return (uint * 50) // 3
# Reads latitude and converts it to NMEA2000 format
def latitude(self): # 27 bits
uint = self.int(27)
if uint == 0x3412140: # invalid value
return 0x7fffffff
#uint = ~((~uint) & 0x3FFFFFF) if (uint & 0x4000000) else uint # fix sign
return (uint * 50) // 3
# Reads COG and converts it to NMEA2000 format
def cog(self): # 12 bits
uint = self.uint(12)
if uint >= 3600: # invalid value
return 0xffff
return uint * 1047215 // 60001
# Reads SOG and converts it to NMEA2000 format
def sog(self): # 10 bits
uint = self.uint(10)
if uint >= 1023: # invalid value
return 0xffff
uint *= 1852
return uint // 360 + ((uint // 36) >= 5)
# Reads heading and converts it to NMEA2000 format
def hdg(self): # 9 bits
uint = self.uint(9)
if uint >= 360: # invalid value
return 0xffff
return uint * 10472152 // 60001
# Reads ROT and converts it to NMEA2000 format
def rot(self): # 8 bits
uint = self.uint(8)
if uint == 0x80: # invalid value
return 0x7fff
if uint == 0: # skip calculations if 0
return 0
sign = (uint & 0x80) != 0
uint = (~uint) & 0x7F if sign else uint
rot = uint / 4.733
rot = (rot * rot) * pi / (180 * 60) * 32000
if rot > 32000:
rot = 32000
return int(-rot if sign else rot)
# Reads text string
def str(self, str_len = 0): # (6 * str_len) bits
if str_len <= 0:
str_len = (self.bit_size - self.bit_offset) // 6 + str_len
ba = bytearray(str_len)
for i in range(str_len):
uint = self.uint(6)
ba[i] = uint if (uint >= 32) else (uint + 64)
return ba.rstrip(b'@')
def __init__(self):
self.data = bytearray(100)
# Initializes decoder with specified binary data
def init(self, data, fill_bits = 0):
decode6bit(data, self.data) # ASCII -> Binary
self.bit_size = len(self.data) * 8 - fill_bits
self.bit_offset = 0
self.msg_id = self.uint(6)
self.repeat_ind = self.uint(2)
self.mmsi = self.uint(30)
return (self.msg_id, self.repeat_ind, self.mmsi)
ais = AIS() # AIS decoder
# Converts received NMEA0183 AIS message and sends it to NMEA2000
def ais_process(tx, count, num, seq, ch_b, data, fill_bits):
global ais, pos_report_out
global PACK_FMT_POS_REPORT, PACK_FMT_SAFETY_MSG
msg_id, repeat_ind, mmsi = ais.init(data, fill_bits)
msg_repeat_ind = msg_id | (repeat_ind << 6)
tx_info = (tx << 1) | ch_b;
if msg_id == 1 or msg_id == 2 or msg_id == 3: # Class A Position Report
nav_status = ais.uint(4)
rot = ais.rot()
sog = ais.sog()
pos_acc = ais.uint(1)
longitude = ais.longitude()
latitude = ais.latitude()
cog = ais.cog()
hdg = ais.hdg()
timestamp = ais.uint(6)
maneuver_ind = ais.uint(2)
ais_spare = ais.uint(3)
raim_flag = ais.uint(1)
comm_state = ais.uint(19)
struct.pack_into(PACK_FMT_POS_REPORT, pos_report_out, 0,
msg_repeat_ind, mmsi, longitude, latitude,
pos_acc | (raim_flag << 1) | (timestamp << 2), cog, sog,
comm_state & 0xFFFF, (comm_state >> 16) | (tx_info << 3), hdg, rot,
nav_status | (maneuver_ind << 4) | 0xC0, ais_spare, 0xFF)
# send PGN "AIS Class A Position Report"
_ = n2k.sendpgn(pos_report_out, 129038, prio=4)
led.green() # indicate successful AIS reception
elif msg_id == 14: # Safety-Related Broadcast Message
ais_spare = ais.uint(2)
text = ais.str() # read until end of the message
safety_out = struct.pack(PACK_FMT_SAFETY_MSG,
msg_repeat_ind, mmsi,
0x01 | (tx_info << 1) | (ais_spare << 6),
len(text) + 2, 0x1)
# add string separatly, because pack() does not support variable-length strings
safety_out += text
safety_out += b'\xFF' # sequence number
# send PGN "AIS Safety-Related Broadcast Message"
_ = n2k.sendpgn(safety_out, 129802, prio=5)
led.green() # indicate successful AIS reception
def rx_ais(n0183, line):
args = line.split(b'*')[0].split(b',') # remove checksum, split arguments
if len(args) == 7:
args[0] = args[0][-1] == ord(b'O') # is TX message?
args[1] = int(args[1]) # number of sentences in message
args[2] = int(args[2]) # index of sentence in sequence
args[4] = args[4] == b'B' # is channel B used?
args[6] = int(args[6]) # number of fill bits
if args[1] == 1 and args[2] == 1: # process only single-sentence messages
ais_process(*args)
return False
n0183.rxcallback(0, rx_ais, '!AIVDM') # received AIS
n0183.rxcallback(1, rx_ais, '!AIVDO') # transmitted AIS
NMEA 2000 USB Gateway, RAW protocol
There is no need to purchase our USB Gateway or NMEA 2000 Wi-Fi Gateway just to use it with the CAN Log Viewer. This program is a bi-directional converter from NMEA 2000 protocol to RAW format, which is also used in the Expedition 10 navigation software.
import sys
def tx_clbk(can, data, msgid, state, unique):
try:
if state == can.TX_DONE: # frame sent successfuly
print(str(toraw(data, msgid, True), 'utf-8'), end='')
except:
pass # don't print possible errors, they can break PC application
def rx_clbk(n2k, data, msgid, seq=-1):
try:
print(str(toraw(data, msgid, False), 'utf-8'), end='')
except:
pass # don't print possible errors, they can break PC application
def state_clbk(state):
if state == ydpg.USB_COM_PORT:
can.txcallback(tx_clbk)
n2k.rxcallback(0, rx_clbk)
led.state(led.GREEN) # connected to PC application, set 6th flash green
else:
n2k.rxcallback(0,None) # removing callbacks to prevent writing to COM port
can.txcallback(None)
ydpg.usbclear() # erase old messages
led.state(led.RED) # disconnected from PC application, set 6th flash red
n2k.assemble(False) # call rx_clbk for separate frames only
n2k.loopback(True) # enables processing of sent messages (required for ISO Requests)
state_clbk(ydpg.usbstate()) # call USB callback one time to set callbacks and LED color
ydpg.usbcallback(state_clbk) # register callback on USB state change
# main loop to process messages sent by PC application
while True:
try:
# receives line from PC application and splits it into arguments
args = sys.stdin.readline().rstrip().split(' ')
# remove possible 'start of text' character
# (sent when PC application is connecting to YDPG)
args[0] = args[0].lstrip('\x02')
# parses HEX message ID and replaces source address
msgid = (int(args[0], 16) & 0x1FFFFF00) | n2k.address()
# turns HEX data into byte buffer
data = bytes([int(i, 16) for i in args[1:]])
# sends frame to NMEA 2000 bus
n2k.send(data, msgid, type = n2k.SINGLE)
except KeyboardInterrupt:
break # exit program by pressing Ctrl+C
except:
pass # don't print possible errors, they can break PC application
# returns YDPG-01 to default state
ydpg.usbcallback(None)
n2k.rxcallback(0, None)
can.txcallback(None)
n2k.loopback(False)
n2k.assemble(True)
led.state(led.OFF)
del state_clbk
del tx_clbk
del rx_clbk
BRP Rotax engine gateway, 11-bit CAN frames
This program demonstrates the handling of 11-bit CAN frames using the BRP Rotax (Bombardier Recreational Products) engine protocol as an example. It supports two engines and converts engine RPM and boost pressure to NMEA 2000. If you need a converter that supports more data types, see our Engine Gateway.
import struct
import time
led.state(led.RED) # set default color of 6th flash
engine_updated = [False, False]
engine_kPa = [0, 0] # boost pressure in NMEA2000 format
engine_rpm = [0, 0] # engine speed in NMEA2000 format
def rx_rotax(can, fifo):
global engine_updated, engine_kPa, engine_rpm
msgid, std, rtr, fmi, data = can.recv(1) # get received CAN frame
engine_idx = int((msgid & 0x0F0) == 0x000)
if msgid == 0x300 or msgid == 0x3A0: # boost pressure received
engine_kPa[engine_idx] = data[5] * 10
elif msgid == 0x102 or msgid == 0x1A2: # engine speed received
engine_rpm[engine_idx] = (data[1] | (data[2] << 8)) // 2
engine_updated[engine_idx] = True # set 'received' flag
can.setfilter(1, can.LIST16, 1, (0x102, 0x1A2, 0x300, 0x3A0)) # allow specified id
can.rxcallback(1, rx_rotax) # register callback on reception of CAN frame
engine_data = bytearray(8) # frame data for PGN "Engine Parameters, Rapid Update"
while True:
time.sleep(0.1) # PGN transmitted every 100 ms for each engine
for idx in range(2):
if engine_updated[idx]: # is engine data received?
engine_updated[idx] = False # clear 'received' flag
struct.pack_into(b'<BHHbH', engine_data, 0,
idx, # engine instance
engine_rpm[idx], # engine speed
engine_kPa[idx], # engine boost pressure
0x7F, # engine tilt/trim
0xFFFF) # reserved
_ = n2k.sendpgn(engine_data, 127488) # send PGN "Engine Parameters, Rapid Update"
led.green() # indicate successful conversion