diff --git a/credsweeper/filters/value_base64_data_check.py b/credsweeper/filters/value_base64_data_check.py index 736c86009..bd3295222 100644 --- a/credsweeper/filters/value_base64_data_check.py +++ b/credsweeper/filters/value_base64_data_check.py @@ -1,4 +1,3 @@ -import base64 import contextlib import string @@ -40,14 +39,6 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool: return True # check whether decoded bytes have enough entropy with contextlib.suppress(Exception): - value_len = len(value) - if 0x3 & value_len: - # Bitbucket client id is 18 chars length - pad_len = 4 - (0x3 & value_len) - value = value + ''.join(['='] * pad_len) - if '-' in value or '_' in value: - decoded = base64.urlsafe_b64decode(value) - else: - decoded = base64.standard_b64decode(value) + decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True) return Util.is_ascii_entropy_validate(decoded) return True diff --git a/credsweeper/filters/value_grafana_check.py b/credsweeper/filters/value_grafana_check.py index c9e215fca..3bd583555 100644 --- a/credsweeper/filters/value_grafana_check.py +++ b/credsweeper/filters/value_grafana_check.py @@ -1,4 +1,3 @@ -import base64 import contextlib import json @@ -6,6 +5,7 @@ from credsweeper.credentials import LineData from credsweeper.file_handler.analysis_target import AnalysisTarget from credsweeper.filters import Filter +from credsweeper.utils import Util class ValueGrafanaCheck(Filter): @@ -30,11 +30,11 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool: with contextlib.suppress(Exception): if line_data.value.startswith("glc_"): # Grafana Access Policy Token - decoded = base64.b64decode(line_data.value[4:]) + decoded = Util.decode_base64(line_data.value[4:], padding_safe=True, urlsafe_detect=True) keys = ["o", "n", "k", "m"] else: # Grafana Provisioned API Key - decoded = base64.b64decode(line_data.value) + decoded = Util.decode_base64(line_data.value, padding_safe=True, urlsafe_detect=True) keys = ["n", "k", "id"] if payload := json.loads(decoded): for key in keys: diff --git a/credsweeper/filters/value_json_web_token_check.py b/credsweeper/filters/value_json_web_token_check.py index 0649c694c..a53b05cf8 100644 --- a/credsweeper/filters/value_json_web_token_check.py +++ b/credsweeper/filters/value_json_web_token_check.py @@ -1,4 +1,3 @@ -import base64 import contextlib import json @@ -6,6 +5,7 @@ from credsweeper.credentials import LineData from credsweeper.file_handler.analysis_target import AnalysisTarget from credsweeper.filters import Filter +from credsweeper.utils import Util class ValueJsonWebTokenCheck(Filter): @@ -35,7 +35,7 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool: delimiter_pos = line_data.value.find(".") # jwt token. '.' must be always in given data, according regex in rule value = line_data.value[:delimiter_pos] - decoded = base64.b64decode(value) + decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True) if header := json.loads(decoded): if "alg" in header or "typ" in header: return False diff --git a/credsweeper/filters/value_structured_token_check.py b/credsweeper/filters/value_structured_token_check.py index e2ee64f56..161e555b3 100644 --- a/credsweeper/filters/value_structured_token_check.py +++ b/credsweeper/filters/value_structured_token_check.py @@ -1,4 +1,3 @@ -import base64 import binascii import contextlib @@ -55,7 +54,7 @@ def check_crc32_struct(value: str) -> bool: @staticmethod def check_atlassian_struct(value: str) -> bool: """Returns False if value is valid for atlassian structure 'integer:bytes'""" - decoded = base64.b64decode(value) + decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True) delimiter_pos = decoded.find(b':') # there is limit for big integer value: math.log10(1<<64) = 19.265919722494797 if 0 < delimiter_pos <= 20: diff --git a/credsweeper/utils/util.py b/credsweeper/utils/util.py index 20d3fc760..7dc4dfea7 100644 --- a/credsweeper/utils/util.py +++ b/credsweeper/utils/util.py @@ -1,4 +1,5 @@ import ast +import base64 import json import logging import math @@ -413,13 +414,15 @@ def is_asn1(data: bytes) -> bool: byte_len = (0x7F & length) if 0x80 == length and data.endswith(b"\x00\x00"): return True - elif 0x80 < length and byte_len < data_length: # additional check + elif 0x80 < length and 1 < byte_len < data_length: # additional check len_bytes = data[2:2 + byte_len] try: long_size = struct.unpack(">h", len_bytes) except struct.error: long_size = (-1,) # yapf: disable length = long_size[0] + elif 0x80 < length and 1 == byte_len: # small size + length = data[2] else: byte_len = 0 return data_length == length + 2 + byte_len @@ -613,3 +616,17 @@ def parse_python(source: str) -> List[Any]: src = ast.parse(source) result = Util.ast_to_dict(src) return result + + @staticmethod + def decode_base64(text: str, padding_safe: bool = False, urlsafe_detect=False) -> bytes: + """decode text to bytes with / without padding detect and urlsafe symbols""" + value = text + if padding_safe: + pad_num = 0x3 & len(value) + if pad_num: + value += '=' * (4 - pad_num) + if urlsafe_detect and ('-' in value or '_' in value): + decoded = base64.b64decode(value, altchars=b"-_", validate=True) + else: + decoded = base64.b64decode(value, validate=True) + return decoded diff --git a/tests/filters/test_value_base64_data_check.py b/tests/filters/test_value_base64_data_check.py index 0847c6b75..3914e3b90 100644 --- a/tests/filters/test_value_base64_data_check.py +++ b/tests/filters/test_value_base64_data_check.py @@ -12,7 +12,7 @@ def test_value_entropy_check_p(self, file_path: pytest.fixture, line: str) -> No line_data = get_line_data(file_path, line=line, pattern=LINE_VALUE_PATTERN) assert ValueBase64DataCheck().run(line_data, DUMMY_ANALYSIS_TARGET) is False - @pytest.mark.parametrize("line", ["eyJ0eXAiOiJKV1QiLC", "2AA219GG746F88F6DDA0D852A0FD3211"]) + @pytest.mark.parametrize("line", ["eyJ0eXAiOiJKV1QiLC", "2AA219GG746F88F6DDA0D852A0FD3211", "!@#$%^&*(_)0aA"]) def test_value_entropy_check_n(self, file_path: pytest.fixture, line: str) -> None: line_data = get_line_data(file_path, line=line, pattern=LINE_VALUE_PATTERN) assert ValueBase64DataCheck().run(line_data, DUMMY_ANALYSIS_TARGET) is True diff --git a/tests/utils/test_util.py b/tests/utils/test_util.py index 80e0c4f70..38d501197 100644 --- a/tests/utils/test_util.py +++ b/tests/utils/test_util.py @@ -1,4 +1,5 @@ import base64 +import binascii import os import random import string @@ -511,3 +512,24 @@ def test_parse_py_n(self): # wrong syntax with self.assertRaises(SyntaxError): self.assertFalse(Util.parse_python(""""Hello World!"""")) + + def test_decode_base64_p(self): + self.assertEqual(AZ_DATA, Util.decode_base64("VGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIHRoZSBsYXp5IGRvZw==")) + self.assertEqual(b"\xFF\xFF\xFF", Util.decode_base64("////")) + self.assertEqual(b"\xFB\xEF\xBE", Util.decode_base64("++++")) + self.assertEqual(b"\xFF\xFF\xFF", Util.decode_base64("____", urlsafe_detect=True)) + self.assertEqual(b"\xFB\xEF\xBE", Util.decode_base64("----", urlsafe_detect=True)) + self.assertEqual(b"\xFF\xFE", Util.decode_base64("//4", padding_safe=True)) + self.assertEqual(b"\xFF\xFE", Util.decode_base64("__4", padding_safe=True, urlsafe_detect=True)) + + def test_decode_base64_n(self): + with self.assertRaises(binascii.Error): + Util.decode_base64("VGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIHRoZSBsYXp5IGRvZw") + with self.assertRaises(binascii.Error): + Util.decode_base64("-_+_-", padding_safe=True, urlsafe_detect=True) + with self.assertRaises(binascii.Error): + Util.decode_base64("/** ! */", urlsafe_detect=True) + with self.assertRaises(binascii.Error): + Util.decode_base64("____") + with self.assertRaises(binascii.Error): + Util.decode_base64("----")