Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial pass at CCSDS uplink plugins #179

Draft
wants to merge 2 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ amsmath
ANamespace
Anps
api
apid
argcomplete
argdesc
arglist
Expand Down Expand Up @@ -69,6 +70,7 @@ cata
cbl
CCB
CCFF
ccsds
Cdioux
cdn
cdxoefg
Expand Down Expand Up @@ -105,6 +107,7 @@ configs
Consolas
cpp
CPython
crcmod
creatingdocsetswithdoxygen
css
csum
Expand Down Expand Up @@ -577,6 +580,7 @@ rtd
rtf
saba
savefig
scid
scm
scrollable
scrollbar
Expand Down Expand Up @@ -608,6 +612,7 @@ Smath
SNDHWM
socketserver
sorttable
spacepacket
spam
sphinxcontrib
splitext
Expand Down Expand Up @@ -734,6 +739,7 @@ validator
vals
valuemin
valuenow
vcid
Vcs
venv
versionchanged
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
]
dependencies = [
"spacepackets>=0.23.0",
"pluggy>=1.3.0",
"flask>=3.0.0",
"flask_compress>=1.11",
"pyzmq>=24.0.1",
Expand Down Expand Up @@ -62,6 +64,12 @@ fprime-seqgen = "fprime_gds.common.tools.seqgen:main"
fprime-dp-write = "fprime_gds.executables.data_product_writer:main"
fprime-gds = "fprime_gds.executables.run_deployment:main"

# Automatically provided plugins
[project.entry-points.fprime_gds]
space_packet = "fprime_gds.plugins.framing.ccsds:SpacePacketFramerDeframer"
space_data_link = "fprime_gds.plugins.framing.ccsds:SpaceDataLinkFramerDeframer"
space_packet_space_data_link = "fprime_gds.plugins.framing.ccsds:SpacePacketSpaceDataLinkFramerDeframer"

# For Pytest fixtures
[project.entry-points."pytest11"]
fprime_test_api = "fprime_gds.common.testing_fw.pytest_integration"
Expand Down
Empty file.
20 changes: 20 additions & 0 deletions src/fprime_gds/plugins/framing/apid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
""" fprime_gds.plugins.framing.apid: APID mapping functions for F´ data """
from fprime_gds.common.utils.data_desc_type import DataDescType
from fprime.common.models.serialize.numerical_types import U32Type

import struct

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'struct' is not used.

class APID(object):
""" APID implementations """

@classmethod
def from_type(cls, data_type: DataDescType):
""" Map from data description type to APID """
return data_type.value

@classmethod
def from_data(cls, data):
""" Map from data bytes to APID """
u32_type = U32Type()
u32_type.deserialize(data, offset=0)
return cls.from_type(DataDescType(u32_type.val))
216 changes: 216 additions & 0 deletions src/fprime_gds/plugins/framing/ccsds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
""" fprime_gds.plugins.framing.ccsds: implementation of framing plugin to support CCSDS

This file registers a CCSDS plugin and a space-packet plugin used to frame data for use transmitting F´ data within a
CCSDS frame.
"""
import struct
from typing import List, Type

from spacepackets.ccsds.spacepacket import SpacePacketHeader, PacketType, SpacePacket

from fprime.common.models.serialize.numerical_types import U32Type

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'U32Type' is not used.
from fprime_gds.common.communication.framing import FramerDeframer, FpFramerDeframer
from fprime_gds.plugin.definitions import gds_plugin_implementation

from fprime_gds.plugins.framing.chain import ChainedFramerDeframer

from fprime_gds.plugins.framing.apid import APID

from crcmod.predefined import PredefinedCrc


class SpacePacketFramerDeframer(FramerDeframer):
""" Concrete implementation of FramerDeframer supporting CCSDS space packets

This implementation is registered as a "framing" plugin to support CCSDS space packets within the GDS layer.
"""
SEQUENCE_NUMBER_MAXIMUM = 16384

def __init__(self):
self.sequence_number = 0

def frame(self, data):
""" Frame the supplied data in a SpacePacket frame

Args:
data: data to frame
Return:
space packet bytes
"""
space_header = SpacePacketHeader(packet_type=PacketType.TC,
apid=APID.from_data(data),
seq_count=self.get_sequence_number(),
data_len=len(data))
space_packet = SpacePacket(space_header, sec_header=None, user_data=data)
return space_packet.pack()

def deframe(self, data, no_copy=False):
""" No op deframe step """
return data, b"", b""

def get_sequence_number(self):
""" Get the sequence number and increment

This function will return the current sequence number and then increment the sequence number for the next round.

Return:
current sequence number
"""
sequence = self.sequence_number
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM
return sequence

@classmethod
def get_name(cls):
""" Name of this implementation provided to CLI """
return "raw-space-packet"

@classmethod
@gds_plugin_implementation
def register_framing_plugin(cls):
""" Register the MyPlugin plugin """
return cls


class SpaceDataLinkFramerDeframer(SpacePacketFramerDeframer):
""" CCSDS space data link Framer/Deframer Implementation """
SEQUENCE_NUMBER_MAXIMUM = 256
HEADER_SIZE = 5

def __init__(self, scid, vcid):
""" """
self.scid = scid
self.vcid = vcid
self.sequence_number = 0
# Note, fprime is used for downlink at this time
self.fprime_framer_deframer = FpFramerDeframer("crc32")

def frame(self, data):
""" Frame the supplied data in an CCSDS space data link packet frame

Args:
data: data to frame
Return:
space data link packet bytes
"""
space_packet_bytes = data
length = len(space_packet_bytes)
assert length < (pow(2, 10) - 1), "Length too-large for CCSDS format"

# CCSDS TC Header:
# 2b - 00 - TF version number
# 1b - 0/1 - 0 enable FARM checks, 1 bypass FARM
# 1b - 0/1 - 0 Type-D data, 1 Type-C data
# 2b - 00 - Reserved
# 10b - XX - Spacecraft id
# 6b - XX - Virtual Channel ID
# 10b - XX - Frame length

# 8b - XX - Frame sequence number

header = (0 << 30) | \
(0 << 29) | \
(0 << 28) | \
((self.scid & 0x3FF) << 16) | \
((self.vcid & 0x3F) << 10) | \
(length & 0x3FF)

header_bytes = struct.pack(">IB", header, self.sequence_number)
assert len(header_bytes) == self.HEADER_SIZE, "CCSDS primary header must be 5 octets long"
full_bytes_no_crc = header_bytes + space_packet_bytes
assert len(full_bytes_no_crc) == self.HEADER_SIZE + length, "Malformed packet generated"

# Use CRC-16 (CCITT) with no final XOR (XOR of 0x0000)
crc_calculator = PredefinedCrc(crc_name="crc-ccitt-false")
crc_calculator.update(full_bytes_no_crc)

full_bytes = full_bytes_no_crc + struct.pack(">H", crc_calculator.crcValue)
return full_bytes

def get_sequence_number(self):
""" Get the sequence number and increment

This function will return the current sequence number and then increment the sequence number for the next round.

Return:
current sequence number
"""
sequence = self.sequence_number
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM
return sequence

def deframe(self, data, no_copy=False):
""" Deframe using fprime for now """
return self.fprime_framer_deframer.deframe(data, no_copy)

@classmethod
def get_arguments(cls):
""" Arguments to request from the CLI """
return {
("--scid", ): {
"type": lambda input_arg: int(input_arg, 0),
"help": "Spacecraft ID"
},
("--vcid",): {
"type": lambda input_arg: int(input_arg, 0),
"help": "Virtual channel ID"
}
}

@classmethod
def check_arguments(cls, scid, vcid):
""" Check arguments from the CLI

Confirms that the input arguments are valid for this framer/deframer.

Args:
scid: spacecraft id
vcid: virtual channel id
"""
if scid is None:
raise TypeError(f"Spacecraft ID not specified")
if scid < 0:
raise TypeError(f"Spacecraft ID {scid} is negative")
if scid > 0x3FF:
raise TypeError(f"Spacecraft ID {scid} is larger than {0x3FF}")

if vcid is None:
raise TypeError(f"Virtual Channel ID not specified")
if vcid < 0:
raise TypeError(f"Virtual Channel ID {vcid} is negative")
if vcid > 0x3F:
raise TypeError(f"Virtual Channel ID {vcid} is larger than {0x3FF}")

@classmethod
def get_name(cls):
""" Name of this implementation provided to CLI """
return "unspecified-space-data-link"

@classmethod
@gds_plugin_implementation
def register_framing_plugin(cls):
""" Register the MyPlugin plugin """
return cls


class SpacePacketSpaceDataLinkFramerDeframer(ChainedFramerDeframer):
""" Space Data Link Protocol framing and deframing that has a data unit of Space Packets """

@classmethod
def get_composites(cls) -> List[Type[FramerDeframer]]:
""" Return the composite list of this """
return [
SpacePacketFramerDeframer,
SpaceDataLinkFramerDeframer
]

@classmethod
def get_name(cls):
""" Name of this implementation provided to CLI """
return "space-packet-space-data-link"

@classmethod
@gds_plugin_implementation
def register_framing_plugin(cls):
""" Register the MyPlugin plugin """
return cls
82 changes: 82 additions & 0 deletions src/fprime_gds/plugins/framing/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
""" fprime_gds.plugins.framing.chain: implementation of a chained framer/deframer """
from abc import ABC, abstractmethod
from functools import reduce
from typing import Any, Dict, List, Type
from fprime_gds.common.communication.framing import FramerDeframer


class ChainedFramerDeframer(FramerDeframer, ABC):
""" Framer/deframer that is a composite of chained framer/deframers

This Framer/Deframer will wrap a set of framer/deframers where the result of the frame and deframe options will pass
from one to the other subsequently. The order is specified via the framing path and deframing will use the reverse
order from specified.
"""
def __init__(self, **kwargs):
""" Initialize the chained framer/deframer from a framing-ordered set of children """
frame_order_framer_deframers = [
composite(**self.get_argument_subset(composite, kwargs))
for composite in self.get_composites()
]
self.framers = frame_order_framer_deframers[::1]
self.deframers = frame_order_framer_deframers[::-1]

@classmethod
@abstractmethod
def get_composites(cls) -> List[Type[FramerDeframer]]:
""" Return a list of composites """
raise NotImplementedError(f"Subclasses of {cls.__name__} must implement get_composites")

@staticmethod
def get_argument_subset(composite: Type[FramerDeframer], argument_dictionary: Dict[str, Any]) -> Dict[str, Any]:
""" Get an argument subset that is needed by composite

For the composite, find the set of arguments that is needed by this composite and pull those out of the complete
argument dictionary.

Args:
composite: class of a subtype of FramerDeframer
argument_dictionary: dictionary of all input arguments
"""
if not hasattr(composite, "get_arguments"):
return {}
needed_arguments = composite.get_arguments()
needed_argument_destinations = [
description["destination"] if "destination" in description else
[dash_dash for dash_dash in flag if dash_dash.startswith("--")][0].lstrip("-").replace("-", "_")
for flag, description in needed_arguments.items()
]
return {name: argument_dictionary[name] for name in needed_argument_destinations}

@classmethod
def get_arguments(cls):
""" Arguments to request from the CLI """
all_arguments = {}
for composite in cls.get_composites():
all_arguments.update(composite.get_arguments() if hasattr(composite, "get_arguments") else {})
return all_arguments

@classmethod
def check_arguments(cls, **kwargs):
""" Check arguments from the CLI """
for composite in cls.get_composites():
subset_arguments = cls.get_argument_subset(composite, kwargs)
if hasattr(composite, "check_arguments"):
composite.check_arguments(**subset_arguments)

def deframe(self, data, no_copy=False):
""" Deframe via a chain of children deframers """
packet = data[:] if not no_copy else data
remaining = None
discarded = b""

for deframer in self.deframers:
new_packet, new_remaining, new_discarded = deframer.deframe(packet, True)
discarded += new_discarded
remaining = new_remaining if remaining is None else remaining
packet = new_packet
return packet, remaining, discarded

def frame(self, data):
""" Frame via a chain of children framers """
return reduce(lambda framed_data, framer: framer.frame(framed_data), self.framers, data)
Loading