Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V4.0.1/hotfix #55

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ ignore = E203, E266, E501, W503, B006, B007, B008, F401, C416, B950, B904
max-line-length = 88
max-complexity = 18
select = B,C,E,F,W,T4,B9
exclude = venv, .venv, tests/.datafog_env, examples/venv
exclude = venv, .venv, tests/.datafog_env, examples/venv, script.py
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ error_log.txt
docs/*
!docs/*.rst
!docs/conf.py
scratch.py
scratch.py
script.py
3 changes: 1 addition & 2 deletions datafog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .__about__ import __version__
from .client import app
from .config import OperationType, get_config
from .main import DataFog, TextPIIAnnotator
from .main import DataFog
from .models.annotator import (
AnalysisExplanation,
AnnotationResult,
Expand Down Expand Up @@ -30,7 +30,6 @@
"ImageService",
"OperationType",
"SparkService",
"TextPIIAnnotator",
"TextService",
"SpacyPIIAnnotator",
"ImageDownloader",
Expand Down
124 changes: 39 additions & 85 deletions datafog/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from .config import OperationType
from .models.anonymizer import Anonymizer, AnonymizerType, HashType
from .processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator
from .models.spacy_nlp import SpacyAnnotator
from .services.image_service import ImageService
from .services.spark_service import SparkService
from .services.text_service import TextService
Expand All @@ -36,6 +36,7 @@ class DataFog:
spark_service: Optional Spark service for distributed processing.
operations: List of operations to perform.
anonymizer: Anonymizer for PII redaction, replacement, or hashing.
annotator: SpacyAnnotator instance for text annotation.
"""

def __init__(
Expand All @@ -54,6 +55,7 @@ def __init__(
self.anonymizer = Anonymizer(
hash_type=hash_type, anonymizer_type=anonymizer_type
)
self.annotator = SpacyAnnotator()
self.logger = logging.getLogger(__name__)
self.logger.info(
"Initializing DataFog class with the following services and operations:"
Expand Down Expand Up @@ -120,63 +122,51 @@ async def _process_text(self, text_list: List[str]):
"""
Internal method to process text based on enabled operations.
"""
if OperationType.SCAN in self.operations:
annotated_text = await self.text_service.batch_annotate_text_async(
text_list
)
self.logger.info(
f"Text annotation completed with {len(annotated_text)} annotations."
)

if OperationType.REDACT in self.operations:
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.REDACT
).anonymized_text
for text, annotations in zip(text_list, annotated_text, strict=True)
]
elif OperationType.REPLACE in self.operations:
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.REPLACE
).anonymized_text
for text, annotations in zip(text_list, annotated_text, strict=True)
]
elif OperationType.HASH in self.operations:
return [
self.anonymizer.anonymize(
text, annotations, AnonymizerType.HASH
).anonymized_text
for text, annotations in zip(text_list, annotated_text, strict=True)
try:
if OperationType.SCAN in self.operations:
annotated_text = [
self.annotator.annotate_text(text) for text in text_list
]
else:

if OperationType.REDACT in self.operations:
self.anonymizer.anonymizer_type = AnonymizerType.REDACT
elif OperationType.REPLACE in self.operations:
self.anonymizer.anonymizer_type = AnonymizerType.REPLACE
elif OperationType.HASH in self.operations:
self.anonymizer.anonymizer_type = AnonymizerType.HASH

if any(
op in self.operations
for op in [
OperationType.REDACT,
OperationType.REPLACE,
OperationType.HASH,
]
):
return [
self.anonymizer.anonymize(text, annotations).anonymized_text
for text, annotations in zip(
text_list, annotated_text, strict=True
)
]
return annotated_text

self.logger.info(
"No annotation or anonymization operation found; returning original texts."
)
return text_list
return text_list
except Exception as e:
self.logger.error(f"Error in _process_text: {str(e)}")
raise

def run_text_pipeline_sync(self, str_list: List[str]) -> List[str]:
"""
Run the text pipeline synchronously on a list of input text.

Args:
str_list (List[str]): A list of text strings to be processed.

Returns:
List[str]: Processed text results based on the enabled operations.

Raises:
Exception: Any error encountered during the text processing.
"""
try:
self.logger.info(f"Starting text pipeline with {len(str_list)} texts.")
self.logger.info(f"Starting text pipeline with {len(str_list)} texts")

if OperationType.SCAN in self.operations:
annotated_text = self.text_service.batch_annotate_text_sync(str_list)
self.logger.info(
f"Text annotation completed with {len(annotated_text)} annotations."
)
annotated_text = [
self.annotator.annotate_text(text) for text in str_list
]

if any(
op in self.operations
Expand All @@ -192,12 +182,8 @@ def run_text_pipeline_sync(self, str_list: List[str]) -> List[str]:
str_list, annotated_text, strict=True
)
]
else:
return annotated_text
return annotated_text

self.logger.info(
"No annotation or anonymization operation found; returning original texts."
)
return str_list
except Exception as e:
self.logger.error(f"Error in run_text_pipeline_sync: {str(e)}")
Expand All @@ -222,35 +208,3 @@ def _add_attributes(self, attributes: dict):
"""
for key, value in attributes.items():
setattr(self, key, value)


class TextPIIAnnotator:
"""
Class for annotating PII in text.

Provides functionality to detect and annotate Personally Identifiable Information (PII) in text.

Attributes:
text_annotator: SpacyPIIAnnotator instance for text annotation.
spark_processor: Optional SparkService for distributed processing.
"""

def __init__(self):
self.text_annotator = SpacyPIIAnnotator.create()
self.spark_processor: SparkService = None

def run(self, text, output_path=None):
try:
annotated_text = self.text_annotator.annotate(text)

# Optionally, output the results to a JSON file
if output_path:
with open(output_path, "w") as f:
json.dump(annotated_text, f)

return annotated_text

finally:
# Ensure Spark resources are released
if self.spark_processor:
self.spark_processor.stop()
38 changes: 38 additions & 0 deletions script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from datafog import DataFog
from datafog.config import OperationType
from datafog.models.anonymizer import Anonymizer, AnonymizerType
from datafog.models.spacy_nlp import SpacyAnnotator

client = DataFog(operations=[OperationType.SCAN, OperationType.REDACT])

text = "Tim Cook is the CEO of Apple and is based out of Cupertino, California"

# README Implementation
redacted_text = client.run_text_pipeline_sync([text])[0]
print(redacted_text)

# Correct Implementation
# annotator = SpacyAnnotator()
# anonymizer = Anonymizer(anonymizer_type=AnonymizerType.REDACT)
# annotations = annotator.annotate_text(text)
# result = anonymizer.anonymize(text, annotations)
# print(result.anonymized_text)


# Sample redaction using DataFog main.py implementation
sample_texts = [
"John Smith lives at 123 Main St in New York",
"Contact Sarah Jones at [email protected] or (555) 123-4567",
"SSN: 123-45-6789 belongs to Michael Wilson",
]

# Initialize DataFog with SCAN and REDACT operations
datafog_client = DataFog(operations=[OperationType.SCAN, OperationType.REDACT])

# Process multiple texts synchronously
redacted_results = datafog_client.run_text_pipeline_sync(sample_texts)

print("Original vs Redacted texts:")
for original, redacted in zip(sample_texts, redacted_results):
print("\nOriginal:", original)
print("Redacted:", redacted)
Loading