From a9b935986c09a97883658277d530e9e415cadb82 Mon Sep 17 00:00:00 2001 From: Lalo Teijeiro Date: Wed, 20 Dec 2023 14:39:22 -0800 Subject: [PATCH] Allow different PDP contexts --- Hologram/Network/Modem/Modem.py | 25 ++++++++++++++----- Hologram/Network/Modem/Quectel.py | 12 +++++----- tests/Modem/test_BG96.py | 24 ++++++++++++++++++- tests/Modem/test_Modem.py | 40 +++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 13 deletions(-) diff --git a/Hologram/Network/Modem/Modem.py b/Hologram/Network/Modem/Modem.py index 45f0de7..43a194e 100644 --- a/Hologram/Network/Modem/Modem.py +++ b/Hologram/Network/Modem/Modem.py @@ -13,13 +13,12 @@ from UtilClasses import ModemResult from UtilClasses import SMS from Hologram.Event import Event -from Exceptions.HologramError import SerialError, HologramError, NetworkError, PPPError +from Exceptions.HologramError import SerialError, NetworkError, PPPError from collections import deque import binascii import datetime -import logging import os import serial from serial.tools import list_ports @@ -35,6 +34,7 @@ class Modem(IModem): DEFAULT_SERIAL_TIMEOUT = 1 DEFAULT_SERIAL_RETRIES = 0 DEFAULT_SEND_TIMEOUT = 10 + DEFAULT_PDP_CONTEXT = 1 _RETRY_DELAY = 0.05 # 50 millisecond delay to avoid spinning loops @@ -58,8 +58,8 @@ class Modem(IModem): } # The device_name is the same as the serial port, only provide a device_name if you dont want it to be autodectected - def __init__(self, device_name=None, baud_rate='9600', - chatscript_file=None, event=Event()): + def __init__(self, device_name=None, baud_rate='9600', chatscript_file=None, + event=Event(), apn='hologram', pdp_context=1): super().__init__(device_name=device_name, baud_rate=baud_rate, event=event) @@ -75,7 +75,8 @@ def __init__(self, device_name=None, baud_rate='9600', self.result = ModemResult.OK self.debug_out = '' self.in_ext = False - self._apn = 'hologram' + self._apn = apn + self._pdp_context = pdp_context self._initialize_device_name(device_name) @@ -741,6 +742,10 @@ def _is_pdp_context_active(self): def _set_up_pdp_context(self): if self._is_pdp_context_active(): return True self.logger.info('Setting up PDP context') + + if self._pdp_context != Modem.DEFAULT_PDP_CONTEXT: + self.set('+UPSD', f'0,100,{self._pdp_context}') + self.set('+UPSD', f'0,1,\"{self._apn}\"') self.set('+UPSD', '0,7,\"0.0.0.0\"') ok, _ = self.set('+UPSDA', '0,3', timeout=30) @@ -973,4 +978,12 @@ def apn(self): @apn.setter def apn(self, apn): self._apn = apn - return self.set('+CGDCONT', f'1,"IP","{self._apn}"') + return self.set('+CGDCONT', f'{self._pdp_context},"IP","{self._apn}"') + + @property + def pdp_context(self): + return self._pdp_context + + @pdp_context.setter + def pdp_context(self, pdp_context): + self._pdp_context = pdp_context diff --git a/Hologram/Network/Modem/Quectel.py b/Hologram/Network/Modem/Quectel.py index d996dd0..7b0f4c7 100644 --- a/Hologram/Network/Modem/Quectel.py +++ b/Hologram/Network/Modem/Quectel.py @@ -19,11 +19,11 @@ class Quectel(Modem): - def __init__(self, device_name=None, baud_rate='9600', - chatscript_file=None, event=Event()): + def __init__(self, device_name=None, baud_rate='9600', chatscript_file=None, + event=Event(), apn='hologram', pdp_context=1): - super().__init__(device_name=device_name, baud_rate=baud_rate, - chatscript_file=chatscript_file, event=event) + super().__init__(device_name=device_name, baud_rate=baud_rate, chatscript_file=chatscript_file, + event=event, apn=apn, pdp_context=pdp_context) self._at_sockets_available = True self.urc_response = '' @@ -152,8 +152,8 @@ def set_network_registration_status(self): def _set_up_pdp_context(self): if self._is_pdp_context_active(): return True self.logger.info('Setting up PDP context') - self.set('+QICSGP', f'1,1,\"{self._apn}\",\"\",\"\",1') - ok, _ = self.set('+QIACT', '1', timeout=30) + self.set('+QICSGP', f'{self._pdp_context},1,\"{self._apn}\",\"\",\"\",1') + ok, _ = self.set('+QIACT', f'{self._pdp_context}', timeout=30) if ok != ModemResult.OK: self.logger.error('PDP Context setup failed') raise NetworkError('Failed PDP context setup') diff --git a/tests/Modem/test_BG96.py b/tests/Modem/test_BG96.py index 681d1e0..bc61a64 100644 --- a/tests/Modem/test_BG96.py +++ b/tests/Modem/test_BG96.py @@ -6,7 +6,7 @@ # # test_BG96.py - This file implements unit tests for the BG96 modem interface. -from unittest.mock import patch +from unittest.mock import patch, call import pytest import sys @@ -72,3 +72,25 @@ def test_close_socket(mock_pdp, mock_command, mock_set, no_serial_port): modem.close_socket() mock_set.assert_called_with("+QIACT", "0", timeout=30) mock_command.assert_called_with("+QICLOSE", 1) + +@patch.object(BG96, "set") +def test_set_up_pdp_context_default(mock_set, no_serial_port): + modem = BG96() + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+QICSGP', '1,1,\"hologram\",\"\",\"\",1'), + call('+QIACT', '1', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + +@patch.object(BG96, "set") +def test_set_up_pdp_context_custom_apn_and_pdp_context(mock_set, no_serial_port): + modem = BG96(apn='hologram2', pdp_context=3) + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+QICSGP', '3,1,\"hologram2\",\"\",\"\",1'), + call('+QIACT', '3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) diff --git a/tests/Modem/test_Modem.py b/tests/Modem/test_Modem.py index e290d14..6c83b57 100644 --- a/tests/Modem/test_Modem.py +++ b/tests/Modem/test_Modem.py @@ -6,6 +6,7 @@ # # test_Modem.py - This file implements unit tests for the Modem class. +from unittest.mock import patch, call import pytest import sys from datetime import datetime @@ -47,6 +48,9 @@ def mock_command_sms(modem, at_command): def mock_set_sms(modem, at_command, val): return None +def mock_inactive_pdp_context(modem): + return False + @pytest.fixture def no_serial_port(monkeypatch): monkeypatch.setattr(Modem, '_read_from_serial_port', mock_read) @@ -56,6 +60,7 @@ def no_serial_port(monkeypatch): monkeypatch.setattr(Modem, 'openSerialPort', mock_open_serial_port) monkeypatch.setattr(Modem, 'closeSerialPort', mock_close_serial_port) monkeypatch.setattr(Modem, 'detect_usable_serial_port', mock_detect_usable_serial_port) + monkeypatch.setattr(Modem, '_is_pdp_context_active', mock_inactive_pdp_context) @pytest.fixture def get_sms(monkeypatch): @@ -75,6 +80,8 @@ def test_init_modem_no_args(no_serial_port): assert(modem.chatscript_file.endswith('/chatscripts/default-script')) assert(modem._at_sockets_available == False) assert(modem.description == 'Modem') + assert(modem.apn == 'hologram') + assert(modem.pdp_context == 1) def test_init_modem_chatscriptfileoverride(no_serial_port): modem = Modem(chatscript_file='test-chatscript') @@ -82,6 +89,14 @@ def test_init_modem_chatscriptfileoverride(no_serial_port): assert(modem.socket_identifier == 0) assert(modem.chatscript_file == 'test-chatscript') +def test_init_modem_apn(no_serial_port): + modem = Modem(apn='hologram2') + assert(modem.apn == 'hologram2') + +def test_init_modem_pdp_context(no_serial_port): + modem = Modem(pdp_context=3) + assert(modem.pdp_context == 3) + def test_get_result_string(no_serial_port): modem = Modem() assert(modem.getResultString(0) == 'Modem returned OK') @@ -98,6 +113,31 @@ def test_get_location(no_serial_port): assert(modem.location == 'test location') assert('This modem does not support this property' in str(e)) +@patch.object(Modem, "set") +def test_set_up_pdp_context_default(mock_set, no_serial_port): + modem = Modem() + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+UPSD', '0,1,\"hologram\"'), + call('+UPSD', '0,7,\"0.0.0.0\"'), + call('+UPSDA', '0,3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + +@patch.object(Modem, "set") +def test_set_up_pdp_context_custom_apn_and_pdp_context(mock_set, no_serial_port): + modem = Modem(apn='hologram2', pdp_context=3) + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+UPSD', '0,100,3'), + call('+UPSD', '0,1,\"hologram2\"'), + call('+UPSD', '0,7,\"0.0.0.0\"'), + call('+UPSDA', '0,3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + # SMS def test_get_sms(no_serial_port, get_sms):