Skip to content

Commit

Permalink
Separate function for future reusing (#470)
Browse files Browse the repository at this point in the history
* Separate function for future reusing

* flake8 fix

* + UT for coverage

* [no ci] Update credsweeper/utils/util.py

Co-authored-by: Kostiantyn Melnik <[email protected]>

* + UT for base64 url safe

---------

Co-authored-by: Kostiantyn Melnik <[email protected]>
  • Loading branch information
babenek and kmnls authored Dec 5, 2023
1 parent 29c07e4 commit 35be458
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 19 deletions.
11 changes: 1 addition & 10 deletions credsweeper/filters/value_base64_data_check.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import contextlib
import string

Expand Down Expand Up @@ -40,14 +39,6 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
return True
# check whether decoded bytes have enough entropy
with contextlib.suppress(Exception):
value_len = len(value)
if 0x3 & value_len:
# Bitbucket client id is 18 chars length
pad_len = 4 - (0x3 & value_len)
value = value + ''.join(['='] * pad_len)
if '-' in value or '_' in value:
decoded = base64.urlsafe_b64decode(value)
else:
decoded = base64.standard_b64decode(value)
decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True)
return Util.is_ascii_entropy_validate(decoded)
return True
6 changes: 3 additions & 3 deletions credsweeper/filters/value_grafana_check.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import base64
import contextlib
import json

from credsweeper.config import Config
from credsweeper.credentials import LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.filters import Filter
from credsweeper.utils import Util


class ValueGrafanaCheck(Filter):
Expand All @@ -30,11 +30,11 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
with contextlib.suppress(Exception):
if line_data.value.startswith("glc_"):
# Grafana Access Policy Token
decoded = base64.b64decode(line_data.value[4:])
decoded = Util.decode_base64(line_data.value[4:], padding_safe=True, urlsafe_detect=True)
keys = ["o", "n", "k", "m"]
else:
# Grafana Provisioned API Key
decoded = base64.b64decode(line_data.value)
decoded = Util.decode_base64(line_data.value, padding_safe=True, urlsafe_detect=True)
keys = ["n", "k", "id"]
if payload := json.loads(decoded):
for key in keys:
Expand Down
4 changes: 2 additions & 2 deletions credsweeper/filters/value_json_web_token_check.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import base64
import contextlib
import json

from credsweeper.config import Config
from credsweeper.credentials import LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.filters import Filter
from credsweeper.utils import Util


class ValueJsonWebTokenCheck(Filter):
Expand Down Expand Up @@ -35,7 +35,7 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
delimiter_pos = line_data.value.find(".")
# jwt token. '.' must be always in given data, according regex in rule
value = line_data.value[:delimiter_pos]
decoded = base64.b64decode(value)
decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True)
if header := json.loads(decoded):
if "alg" in header or "typ" in header:
return False
Expand Down
3 changes: 1 addition & 2 deletions credsweeper/filters/value_structured_token_check.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import binascii
import contextlib

Expand Down Expand Up @@ -55,7 +54,7 @@ def check_crc32_struct(value: str) -> bool:
@staticmethod
def check_atlassian_struct(value: str) -> bool:
"""Returns False if value is valid for atlassian structure 'integer:bytes'"""
decoded = base64.b64decode(value)
decoded = Util.decode_base64(value, padding_safe=True, urlsafe_detect=True)
delimiter_pos = decoded.find(b':')
# there is limit for big integer value: math.log10(1<<64) = 19.265919722494797
if 0 < delimiter_pos <= 20:
Expand Down
19 changes: 18 additions & 1 deletion credsweeper/utils/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
import base64
import json
import logging
import math
Expand Down Expand Up @@ -413,13 +414,15 @@ def is_asn1(data: bytes) -> bool:
byte_len = (0x7F & length)
if 0x80 == length and data.endswith(b"\x00\x00"):
return True
elif 0x80 < length and byte_len < data_length: # additional check
elif 0x80 < length and 1 < byte_len < data_length: # additional check
len_bytes = data[2:2 + byte_len]
try:
long_size = struct.unpack(">h", len_bytes)
except struct.error:
long_size = (-1,) # yapf: disable
length = long_size[0]
elif 0x80 < length and 1 == byte_len: # small size
length = data[2]
else:
byte_len = 0
return data_length == length + 2 + byte_len
Expand Down Expand Up @@ -613,3 +616,17 @@ def parse_python(source: str) -> List[Any]:
src = ast.parse(source)
result = Util.ast_to_dict(src)
return result

@staticmethod
def decode_base64(text: str, padding_safe: bool = False, urlsafe_detect=False) -> bytes:
"""decode text to bytes with / without padding detect and urlsafe symbols"""
value = text
if padding_safe:
pad_num = 0x3 & len(value)
if pad_num:
value += '=' * (4 - pad_num)
if urlsafe_detect and ('-' in value or '_' in value):
decoded = base64.b64decode(value, altchars=b"-_", validate=True)
else:
decoded = base64.b64decode(value, validate=True)
return decoded
2 changes: 1 addition & 1 deletion tests/filters/test_value_base64_data_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_value_entropy_check_p(self, file_path: pytest.fixture, line: str) -> No
line_data = get_line_data(file_path, line=line, pattern=LINE_VALUE_PATTERN)
assert ValueBase64DataCheck().run(line_data, DUMMY_ANALYSIS_TARGET) is False

@pytest.mark.parametrize("line", ["eyJ0eXAiOiJKV1QiLC", "2AA219GG746F88F6DDA0D852A0FD3211"])
@pytest.mark.parametrize("line", ["eyJ0eXAiOiJKV1QiLC", "2AA219GG746F88F6DDA0D852A0FD3211", "!@#$%^&*(_)0aA"])
def test_value_entropy_check_n(self, file_path: pytest.fixture, line: str) -> None:
line_data = get_line_data(file_path, line=line, pattern=LINE_VALUE_PATTERN)
assert ValueBase64DataCheck().run(line_data, DUMMY_ANALYSIS_TARGET) is True
Expand Down
22 changes: 22 additions & 0 deletions tests/utils/test_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import binascii
import os
import random
import string
Expand Down Expand Up @@ -511,3 +512,24 @@ def test_parse_py_n(self):
# wrong syntax
with self.assertRaises(SyntaxError):
self.assertFalse(Util.parse_python("""<html>"Hello World!"</html>"""))

def test_decode_base64_p(self):
self.assertEqual(AZ_DATA, Util.decode_base64("VGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIHRoZSBsYXp5IGRvZw=="))
self.assertEqual(b"\xFF\xFF\xFF", Util.decode_base64("////"))
self.assertEqual(b"\xFB\xEF\xBE", Util.decode_base64("++++"))
self.assertEqual(b"\xFF\xFF\xFF", Util.decode_base64("____", urlsafe_detect=True))
self.assertEqual(b"\xFB\xEF\xBE", Util.decode_base64("----", urlsafe_detect=True))
self.assertEqual(b"\xFF\xFE", Util.decode_base64("//4", padding_safe=True))
self.assertEqual(b"\xFF\xFE", Util.decode_base64("__4", padding_safe=True, urlsafe_detect=True))

def test_decode_base64_n(self):
with self.assertRaises(binascii.Error):
Util.decode_base64("VGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIHRoZSBsYXp5IGRvZw")
with self.assertRaises(binascii.Error):
Util.decode_base64("-_+_-", padding_safe=True, urlsafe_detect=True)
with self.assertRaises(binascii.Error):
Util.decode_base64("/** ! */", urlsafe_detect=True)
with self.assertRaises(binascii.Error):
Util.decode_base64("____")
with self.assertRaises(binascii.Error):
Util.decode_base64("----")

0 comments on commit 35be458

Please sign in to comment.