Skip to content

Commit

Permalink
Merge pull request #117 from certego/develop
Browse files Browse the repository at this point in the history
1.4.0
  • Loading branch information
Lorygold authored Jan 27, 2025
2 parents 72cf44c + 3945b9a commit 70cf464
Show file tree
Hide file tree
Showing 18 changed files with 792 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.4.x
### 1.4.0
#### Features
* Implemented filter logic based on the custom Config set

## 1.3.x
### 1.3.2
### Changes
Expand Down
1 change: 1 addition & 0 deletions buffalogs/buffalogs/settings/certego.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
CERTEGO_BUFFALOGS_LOGIN_MAX_DAYS = 30
CERTEGO_BUFFALOGS_ALERT_MAX_DAYS = 30
CERTEGO_BUFFALOGS_IP_MAX_DAYS = 30
CERTEGO_BUFFALOGS_MOBILE_DEVICES = ["iOS", "Android", "Windows Phone"]

if CERTEGO_BUFFALOGS_ENVIRONMENT == ENVIRONMENT_DOCKER:

Expand Down
67 changes: 60 additions & 7 deletions buffalogs/impossible_travel/admin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from django.contrib import admin
from django.contrib.admin.models import CHANGE, LogEntry
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone

from .forms import AlertAdminForm, ConfigAdminForm, UserAdminForm
Expand All @@ -16,17 +18,18 @@ class LoginAdmin(admin.ModelAdmin):
"latitude",
"longitude",
"country",
"user_agent",
"index",
"ip",
"user_agent",
"event_id",
"index",
)
search_fields = ("id", "user__username", "user_agent", "index", "event_id", "ip")
search_fields = ("id", "user__username", "user_agent", "index", "event_id", "ip", "country")

@admin.display(description="username")
def get_username(self, obj):
return obj.user.username

@admin.display(description="timestamp")
def timestamp_display(self, obj):
# Usa strftime per personalizzare il formato
return obj.timestamp.astimezone(timezone.get_current_timezone()).strftime("%b %d, %Y, %I:%M:%S %p %Z")
Expand All @@ -46,8 +49,8 @@ def get_risk_score_value(self, obj):
@admin.register(Alert)
class AlertAdmin(admin.ModelAdmin):
form = AlertAdminForm
list_display = ("id", "created", "updated", "get_username", "get_alert_value", "description", "login_raw_data", "is_vip")
search_fields = ("user__username", "name", "is_vip")
list_display = ("id", "created", "updated", "get_username", "get_alert_value", "description", "login_raw_data", "is_vip", "is_filtered")
search_fields = ("id", "user__username", "name", "is_filtered")

@admin.display(description="username")
def get_username(self, obj):
Expand All @@ -67,8 +70,58 @@ class TaskSettingsAdmin(admin.ModelAdmin):
@admin.register(Config)
class ConfigsAdmin(admin.ModelAdmin):
form = ConfigAdminForm
list_display = ("created", "updated", "ignored_users", "ignored_ips", "ignored_ISPs", "allowed_countries", "vip_users")
search_fields = ("allowed_countries", "vip_users")
fieldsets = [
("Detection filters - users", {"fields": ("ignored_users", "enabled_users", "vip_users", "alert_is_vip_only", "alert_minimum_risk_score")}),
("Detection filters - location", {"fields": ("ignored_ips", "allowed_countries")}),
("Detection filters - devices", {"fields": ("ignored_ISPs", "ignore_mobile_logins")}),
("Detection filters - alerts", {"fields": ("filtered_alerts_types",)}),
("Detection setup - Impossible Travel alerts", {"fields": ("distance_accepted", "vel_accepted")}),
("Detection setup - Clean models", {"fields": ("user_max_days", "login_max_days", "alert_max_days", "ip_max_days")}),
]
list_display = (
"id",
"created",
"updated",
"ignored_users",
"enabled_users",
"ignored_ips",
"get_minimum_risk_score_value",
"allowed_countries",
"filtered_alerts_types",
"alert_is_vip_only",
"ignore_mobile_logins",
)
search_fields = ("id",)

@admin.display(description="Minimum user risk_score")
def get_minimum_risk_score_value(self, obj):
return obj.alert_minimum_risk_score

def save_model(self, request, obj, form, change):
if change:
changes = []
for field in form.changed_data:
old_value = form.initial.get(field)
new_value = form.cleaned_data.get(field)

# Aggiungi un log dettagliato con valori precedenti e nuovi
changes.append(f"{field} changed from {old_value} to {new_value}")

if changes:
self.log_change(request, obj, ", ".join(changes))

super().save_model(request, obj, form, change)

def log_change(self, request, obj, message):
"""Log the detailed message of changes"""
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=ContentType.objects.get_for_model(obj).pk,
object_id=obj.pk,
object_repr=str(obj),
action_flag=CHANGE,
change_message=message,
)


@admin.register(UsersIP)
Expand Down
11 changes: 8 additions & 3 deletions buffalogs/impossible_travel/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from enum import Enum

from django.db import models
from django.utils.translation import gettext_lazy as _

Expand All @@ -19,7 +17,7 @@ class UserRiskScoreType(models.TextChoices):
HIGH = "High", _("User has a high risk")

@classmethod
def get_risk_level(cls, value):
def get_risk_level(cls, value: int):
# map risk value
if value == 0:
return cls.NO_RISK.value
Expand All @@ -32,6 +30,13 @@ def get_risk_level(cls, value):
else:
raise ValueError("Risk value not valid")

@classmethod
def is_equal_or_higher(cls, threshold, value):
# check if the value is equal or higher than the threshold
if UserRiskScoreType.values.index(value) >= UserRiskScoreType.values.index(threshold):
return True
return False


class AlertDetectionType(models.TextChoices):
"""Types of possible alert detections in the format (name=value,label)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Generated by Django 4.2.16 on 2025-01-27 07:43

import django.contrib.postgres.fields
from django.db import migrations, models
import impossible_travel.models
import impossible_travel.validators


class Migration(migrations.Migration):

dependencies = [
("impossible_travel", "0011_alert_filter_type_alert_is_filtered_and_more"),
]

operations = [
migrations.RemoveConstraint(
model_name="alert",
name="valid_alert_filter_type_choices",
),
migrations.RemoveConstraint(
model_name="config",
name="valid_alert_filters_choices",
),
migrations.AlterField(
model_name="config",
name="enabled_users",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(max_length=50),
blank=True,
default=impossible_travel.models.get_default_enabled_users,
help_text="List of selected users (strings or regex patterns) on which the detection will perform - If this field is not empty, the ignored_users field is ignored",
null=True,
size=None,
validators=[impossible_travel.validators.validate_string_or_regex],
),
),
migrations.AlterField(
model_name="config",
name="ignored_ips",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(max_length=50),
blank=True,
default=impossible_travel.models.get_default_ignored_ips,
help_text="List of IPs to remove from the detection",
null=True,
size=None,
validators=[impossible_travel.validators.validate_ips_or_network],
),
),
migrations.AlterField(
model_name="config",
name="ignored_users",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.CharField(max_length=50),
blank=True,
default=impossible_travel.models.get_default_ignored_users,
help_text="List of users (strings or regex patterns) to be ignored from the detection",
null=True,
size=None,
validators=[impossible_travel.validators.validate_string_or_regex],
),
),
migrations.AddConstraint(
model_name="alert",
constraint=models.CheckConstraint(
check=models.Q(
(
"filter_type__contained_by",
[
"ignored_users filter",
"ignored_ips filter",
"allowed_countries filter",
"is_vip_filter",
"alert_minimum_risk_score filter",
"filtered_alerts_types filter",
"ignore_mobile_logins filter",
"ignored_ISPs filter",
],
),
("filter_type", []),
_connector="OR",
),
name="valid_alert_filter_type_choices",
),
),
migrations.AddConstraint(
model_name="config",
constraint=models.CheckConstraint(
check=models.Q(
(
"filtered_alerts_types__contained_by",
[
"New Device",
"Imp Travel",
"New Country",
"User Risk Threshold",
"Login Anonymizer Ip",
"Atypical Country",
],
),
("filtered_alerts_types__isnull", True),
_connector="OR",
),
name="valid_alert_filters_choices",
),
),
]
33 changes: 23 additions & 10 deletions buffalogs/impossible_travel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.db import models
from django.utils import timezone
from impossible_travel.constants import AlertDetectionType, AlertFilterType, UserRiskScoreType
from impossible_travel.validators import validate_ips_or_network, validate_string_or_regex


class User(models.Model):
Expand All @@ -18,7 +19,7 @@ def __str__(self):
class Meta:
constraints = [
models.CheckConstraint(
# Check that the User.risk_score is one of the value in the Enum UserRiskScoreType
# Check that the User.risk_score is one of the value in the Enum UserRiskScoreType --> ['No risk', 'Low', 'Medium', 'High']
check=models.Q(risk_score__in=[choice[0] for choice in UserRiskScoreType.choices]),
name="valid_user_risk_score_choice",
)
Expand Down Expand Up @@ -58,13 +59,13 @@ class Alert(models.Model):
class Meta:
constraints = [
models.CheckConstraint(
# Check that the Alert.name is one of the value in the Enum AlertDetectionType
# Check that the Alert.name is one of the value in the Enum AlertDetectionType --> ['New Device', 'Imp Travel', 'New Country', 'User Risk Threshold', 'Login Anonymizer Ip', 'Atypical Country']
check=models.Q(name__in=[choice[0] for choice in AlertDetectionType.choices]),
name="valid_alert_name_choice",
),
models.CheckConstraint(
# Check that each element in the Alert.filter_type is in the Enum AlertFilterType
check=models.Q(filter_type__contained_by=AlertFilterType.choices),
# Check that each element in the Alert.filter_type is in the Enum AlertFilterType --> ['ignored_users filter', 'ignored_ips filter', 'allowed_countries filter', 'is_vip_filter', 'alert_minimum_risk_score filter', 'filtered_alerts_types filter', 'ignore_mobile_logins filter', 'ignored_ISPs filter']
check=models.Q(filter_type__contained_by=[choice[0] for choice in AlertFilterType.choices]) | models.Q(filter_type=[]),
name="valid_alert_filter_type_choices",
),
]
Expand Down Expand Up @@ -113,17 +114,28 @@ class Config(models.Model):
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
ignored_users = ArrayField(
models.CharField(max_length=50), blank=True, null=True, default=get_default_ignored_users, help_text="List of users to be ignored from the detection"
models.CharField(max_length=50),
blank=True,
null=True,
default=get_default_ignored_users,
validators=[validate_string_or_regex],
help_text="List of users (strings or regex patterns) to be ignored from the detection",
)
enabled_users = ArrayField(
models.CharField(max_length=50),
blank=True,
null=True,
default=get_default_enabled_users,
help_text="List of selected users on which the detection will perform",
validators=[validate_string_or_regex],
help_text="List of selected users (strings or regex patterns) on which the detection will perform - If this field is not empty, the ignored_users field is ignored",
)
ignored_ips = ArrayField(
models.CharField(max_length=50), blank=True, null=True, default=get_default_ignored_ips, help_text="List of IPs to remove from the detection"
models.CharField(max_length=50),
blank=True,
null=True,
default=get_default_ignored_ips,
validators=[validate_ips_or_network],
help_text="List of IPs to remove from the detection",
)
ignored_ISPs = ArrayField(
models.CharField(max_length=50), blank=True, null=True, default=get_default_ignored_ISPs, help_text="List of ISPs names to remove from the detection"
Expand Down Expand Up @@ -187,13 +199,14 @@ def save(self, *args, **kwargs):
class Meta:
constraints = [
models.CheckConstraint(
# Check that the Config.alert_minimum_risk_score is one of the value in the Enum UserRiskScoreType
# Check that the Config.alert_minimum_risk_score is one of the value in the Enum UserRiskScoreType --> ['No risk', 'Low', 'Medium', 'High']
check=models.Q(alert_minimum_risk_score__in=[choice[0] for choice in UserRiskScoreType.choices]),
name="valid_config_alert_minimum_risk_score_choice",
),
models.CheckConstraint(
# Check that each element in the Config.filtered_alerts_types is in the Enum AlertFilterType
check=models.Q(filtered_alerts_types__contained_by=AlertFilterType.choices),
# Check that each element in the Config.filtered_alerts_types is blank or it's in the Enum AlertFilterType --> ['New Device', 'Imp Travel', 'New Country', 'User Risk Threshold', 'Login Anonymizer Ip', 'Atypical Country']
check=models.Q(filtered_alerts_types__contained_by=[choice[0] for choice in AlertDetectionType.choices])
| models.Q(filtered_alerts_types__isnull=True),
name="valid_alert_filters_choices",
),
]
Loading

0 comments on commit 70cf464

Please sign in to comment.