-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial pass at CCSDS uplink plugins #179
Draft
LeStarch
wants to merge
2
commits into
nasa:devel
Choose a base branch
from
LeStarch:ccsds/uplink-plugins
base: devel
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
""" fprime_gds.plugins.framing.apid: APID mapping functions for F´ data """ | ||
from fprime_gds.common.utils.data_desc_type import DataDescType | ||
from fprime.common.models.serialize.numerical_types import U32Type | ||
|
||
import struct | ||
|
||
class APID(object): | ||
""" APID implementations """ | ||
|
||
@classmethod | ||
def from_type(cls, data_type: DataDescType): | ||
""" Map from data description type to APID """ | ||
return data_type.value | ||
|
||
@classmethod | ||
def from_data(cls, data): | ||
""" Map from data bytes to APID """ | ||
u32_type = U32Type() | ||
u32_type.deserialize(data, offset=0) | ||
return cls.from_type(DataDescType(u32_type.val)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
""" fprime_gds.plugins.framing.ccsds: implementation of framing plugin to support CCSDS | ||
|
||
This file registers a CCSDS plugin and a space-packet plugin used to frame data for use transmitting F´ data within a | ||
CCSDS frame. | ||
""" | ||
import struct | ||
from typing import List, Type | ||
|
||
from spacepackets.ccsds.spacepacket import SpacePacketHeader, PacketType, SpacePacket | ||
|
||
from fprime.common.models.serialize.numerical_types import U32Type | ||
Check notice Code scanning / CodeQL Unused import Note
Import of 'U32Type' is not used.
|
||
from fprime_gds.common.communication.framing import FramerDeframer, FpFramerDeframer | ||
from fprime_gds.plugin.definitions import gds_plugin_implementation | ||
|
||
from fprime_gds.plugins.framing.chain import ChainedFramerDeframer | ||
|
||
from fprime_gds.plugins.framing.apid import APID | ||
|
||
from crcmod.predefined import PredefinedCrc | ||
|
||
|
||
class SpacePacketFramerDeframer(FramerDeframer): | ||
""" Concrete implementation of FramerDeframer supporting CCSDS space packets | ||
|
||
This implementation is registered as a "framing" plugin to support CCSDS space packets within the GDS layer. | ||
""" | ||
SEQUENCE_NUMBER_MAXIMUM = 16384 | ||
|
||
def __init__(self): | ||
self.sequence_number = 0 | ||
|
||
def frame(self, data): | ||
""" Frame the supplied data in a SpacePacket frame | ||
|
||
Args: | ||
data: data to frame | ||
Return: | ||
space packet bytes | ||
""" | ||
space_header = SpacePacketHeader(packet_type=PacketType.TC, | ||
apid=APID.from_data(data), | ||
seq_count=self.get_sequence_number(), | ||
data_len=len(data)) | ||
space_packet = SpacePacket(space_header, sec_header=None, user_data=data) | ||
return space_packet.pack() | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" No op deframe step """ | ||
return data, b"", b"" | ||
|
||
def get_sequence_number(self): | ||
""" Get the sequence number and increment | ||
|
||
This function will return the current sequence number and then increment the sequence number for the next round. | ||
|
||
Return: | ||
current sequence number | ||
""" | ||
sequence = self.sequence_number | ||
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM | ||
return sequence | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "raw-space-packet" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls | ||
|
||
|
||
class SpaceDataLinkFramerDeframer(SpacePacketFramerDeframer): | ||
""" CCSDS space data link Framer/Deframer Implementation """ | ||
SEQUENCE_NUMBER_MAXIMUM = 256 | ||
HEADER_SIZE = 5 | ||
|
||
def __init__(self, scid, vcid): | ||
""" """ | ||
self.scid = scid | ||
self.vcid = vcid | ||
self.sequence_number = 0 | ||
# Note, fprime is used for downlink at this time | ||
self.fprime_framer_deframer = FpFramerDeframer("crc32") | ||
|
||
def frame(self, data): | ||
""" Frame the supplied data in an CCSDS space data link packet frame | ||
|
||
Args: | ||
data: data to frame | ||
Return: | ||
space data link packet bytes | ||
""" | ||
space_packet_bytes = data | ||
length = len(space_packet_bytes) | ||
assert length < (pow(2, 10) - 1), "Length too-large for CCSDS format" | ||
|
||
# CCSDS TC Header: | ||
# 2b - 00 - TF version number | ||
# 1b - 0/1 - 0 enable FARM checks, 1 bypass FARM | ||
# 1b - 0/1 - 0 Type-D data, 1 Type-C data | ||
# 2b - 00 - Reserved | ||
# 10b - XX - Spacecraft id | ||
# 6b - XX - Virtual Channel ID | ||
# 10b - XX - Frame length | ||
|
||
# 8b - XX - Frame sequence number | ||
|
||
header = (0 << 30) | \ | ||
(0 << 29) | \ | ||
(0 << 28) | \ | ||
((self.scid & 0x3FF) << 16) | \ | ||
((self.vcid & 0x3F) << 10) | \ | ||
(length & 0x3FF) | ||
|
||
header_bytes = struct.pack(">IB", header, self.sequence_number) | ||
assert len(header_bytes) == self.HEADER_SIZE, "CCSDS primary header must be 5 octets long" | ||
full_bytes_no_crc = header_bytes + space_packet_bytes | ||
assert len(full_bytes_no_crc) == self.HEADER_SIZE + length, "Malformed packet generated" | ||
|
||
# Use CRC-16 (CCITT) with no final XOR (XOR of 0x0000) | ||
crc_calculator = PredefinedCrc(crc_name="crc-ccitt-false") | ||
crc_calculator.update(full_bytes_no_crc) | ||
|
||
full_bytes = full_bytes_no_crc + struct.pack(">H", crc_calculator.crcValue) | ||
return full_bytes | ||
|
||
def get_sequence_number(self): | ||
""" Get the sequence number and increment | ||
|
||
This function will return the current sequence number and then increment the sequence number for the next round. | ||
|
||
Return: | ||
current sequence number | ||
""" | ||
sequence = self.sequence_number | ||
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM | ||
return sequence | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" Deframe using fprime for now """ | ||
return self.fprime_framer_deframer.deframe(data, no_copy) | ||
|
||
@classmethod | ||
def get_arguments(cls): | ||
""" Arguments to request from the CLI """ | ||
return { | ||
("--scid", ): { | ||
"type": lambda input_arg: int(input_arg, 0), | ||
"help": "Spacecraft ID" | ||
}, | ||
("--vcid",): { | ||
"type": lambda input_arg: int(input_arg, 0), | ||
"help": "Virtual channel ID" | ||
} | ||
} | ||
|
||
@classmethod | ||
def check_arguments(cls, scid, vcid): | ||
""" Check arguments from the CLI | ||
|
||
Confirms that the input arguments are valid for this framer/deframer. | ||
|
||
Args: | ||
scid: spacecraft id | ||
vcid: virtual channel id | ||
""" | ||
if scid is None: | ||
raise TypeError(f"Spacecraft ID not specified") | ||
if scid < 0: | ||
raise TypeError(f"Spacecraft ID {scid} is negative") | ||
if scid > 0x3FF: | ||
raise TypeError(f"Spacecraft ID {scid} is larger than {0x3FF}") | ||
|
||
if vcid is None: | ||
raise TypeError(f"Virtual Channel ID not specified") | ||
if vcid < 0: | ||
raise TypeError(f"Virtual Channel ID {vcid} is negative") | ||
if vcid > 0x3F: | ||
raise TypeError(f"Virtual Channel ID {vcid} is larger than {0x3FF}") | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "unspecified-space-data-link" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls | ||
|
||
|
||
class SpacePacketSpaceDataLinkFramerDeframer(ChainedFramerDeframer): | ||
""" Space Data Link Protocol framing and deframing that has a data unit of Space Packets """ | ||
|
||
@classmethod | ||
def get_composites(cls) -> List[Type[FramerDeframer]]: | ||
""" Return the composite list of this """ | ||
return [ | ||
SpacePacketFramerDeframer, | ||
SpaceDataLinkFramerDeframer | ||
] | ||
|
||
@classmethod | ||
def get_name(cls): | ||
""" Name of this implementation provided to CLI """ | ||
return "space-packet-space-data-link" | ||
|
||
@classmethod | ||
@gds_plugin_implementation | ||
def register_framing_plugin(cls): | ||
""" Register the MyPlugin plugin """ | ||
return cls |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
""" fprime_gds.plugins.framing.chain: implementation of a chained framer/deframer """ | ||
from abc import ABC, abstractmethod | ||
from functools import reduce | ||
from typing import Any, Dict, List, Type | ||
from fprime_gds.common.communication.framing import FramerDeframer | ||
|
||
|
||
class ChainedFramerDeframer(FramerDeframer, ABC): | ||
""" Framer/deframer that is a composite of chained framer/deframers | ||
|
||
This Framer/Deframer will wrap a set of framer/deframers where the result of the frame and deframe options will pass | ||
from one to the other subsequently. The order is specified via the framing path and deframing will use the reverse | ||
order from specified. | ||
""" | ||
def __init__(self, **kwargs): | ||
""" Initialize the chained framer/deframer from a framing-ordered set of children """ | ||
frame_order_framer_deframers = [ | ||
composite(**self.get_argument_subset(composite, kwargs)) | ||
for composite in self.get_composites() | ||
] | ||
self.framers = frame_order_framer_deframers[::1] | ||
self.deframers = frame_order_framer_deframers[::-1] | ||
|
||
@classmethod | ||
@abstractmethod | ||
def get_composites(cls) -> List[Type[FramerDeframer]]: | ||
""" Return a list of composites """ | ||
raise NotImplementedError(f"Subclasses of {cls.__name__} must implement get_composites") | ||
|
||
@staticmethod | ||
def get_argument_subset(composite: Type[FramerDeframer], argument_dictionary: Dict[str, Any]) -> Dict[str, Any]: | ||
""" Get an argument subset that is needed by composite | ||
|
||
For the composite, find the set of arguments that is needed by this composite and pull those out of the complete | ||
argument dictionary. | ||
|
||
Args: | ||
composite: class of a subtype of FramerDeframer | ||
argument_dictionary: dictionary of all input arguments | ||
""" | ||
if not hasattr(composite, "get_arguments"): | ||
return {} | ||
needed_arguments = composite.get_arguments() | ||
needed_argument_destinations = [ | ||
description["destination"] if "destination" in description else | ||
[dash_dash for dash_dash in flag if dash_dash.startswith("--")][0].lstrip("-").replace("-", "_") | ||
for flag, description in needed_arguments.items() | ||
] | ||
return {name: argument_dictionary[name] for name in needed_argument_destinations} | ||
|
||
@classmethod | ||
def get_arguments(cls): | ||
""" Arguments to request from the CLI """ | ||
all_arguments = {} | ||
for composite in cls.get_composites(): | ||
all_arguments.update(composite.get_arguments() if hasattr(composite, "get_arguments") else {}) | ||
return all_arguments | ||
|
||
@classmethod | ||
def check_arguments(cls, **kwargs): | ||
""" Check arguments from the CLI """ | ||
for composite in cls.get_composites(): | ||
subset_arguments = cls.get_argument_subset(composite, kwargs) | ||
if hasattr(composite, "check_arguments"): | ||
composite.check_arguments(**subset_arguments) | ||
|
||
def deframe(self, data, no_copy=False): | ||
""" Deframe via a chain of children deframers """ | ||
packet = data[:] if not no_copy else data | ||
remaining = None | ||
discarded = b"" | ||
|
||
for deframer in self.deframers: | ||
new_packet, new_remaining, new_discarded = deframer.deframe(packet, True) | ||
discarded += new_discarded | ||
remaining = new_remaining if remaining is None else remaining | ||
packet = new_packet | ||
return packet, remaining, discarded | ||
|
||
def frame(self, data): | ||
""" Frame via a chain of children framers """ | ||
return reduce(lambda framed_data, framer: framer.frame(framed_data), self.framers, data) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Check notice
Code scanning / CodeQL
Unused import Note