Skip to content

Commit

Permalink
Merge pull request #4692 from pathwaycom/moha/pdf-to-table
Browse files Browse the repository at this point in the history
parse pdfs into table

GitOrigin-RevId: c2ea469ffd7011727b7013eb866da3aaf23114d3
  • Loading branch information
olruas authored and Manul from Pathway committed Oct 27, 2023
1 parent 0ca8fb1 commit 270bb7a
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Adding the "untructured to SQL on the fly" example

## [0.3.1] - 2023-10-19

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ To get started explore one of the examples:
| [`contextful_s3`](examples/pipelines/contextful_s3/app.py) | This example operates similarly to the contextful mode. The main difference is that the documents are stored and indexed from an S3 bucket, allowing the handling of a larger volume of documents. This can be more suitable for production environments. |
| [`unstructured`](examples/pipelines/unstructured/app.py) | Process unstructured documents such as PDF, HTML, DOCX, PPTX and more. Visit [unstructured-io](https://unstructured-io.github.io/unstructured/) for the full list of supported formats. |
| [`local`](examples/pipelines/local/app.py) | This example runs the application using Huggingface Transformers, which eliminates the need for the data to leave the machine. It provides a convenient way to use state-of-the-art NLP models locally. |
| [`unstructuredtosql`](examples/pipelines/unstructured_to_sql_on_the_fly/app.py) | This example extracts the data from unstructured files and store it into a postgres table. It also transforms the user query into a SQL query which is executed on the postgres table. |

Follow these easy steps to install and get started with your favorite examples. You can also take a look at the [application showcases](#showcases).

Expand All @@ -124,7 +125,7 @@ Create an .env file in the root directory and add the following environment vari

| Environment Variable | Description |
| --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| APP_VARIANT | Determines which pipeline to run in your application. Available modes are [`contextful`,`s3`, `contextless`, `local`]. By default, the mode is set to`contextful`. |
| APP_VARIANT | Determines which pipeline to run in your application. Available modes are [`contextful`, `s3`, `contextless`, `local`, `unstructuredtosql`]. By default, the mode is set to `contextful`. |
| PATHWAY_REST_CONNECTOR_HOST | Specifies the host IP for the REST connector in Pathway. For the dockerized version, set itto `0.0.0.0` Natively, you can use `127.0.01` |
| PATHWAY_REST_CONNECTOR_PORT | Specifies the port number on which the REST connector service of the Pathway should listen.Here, it is set to8080. |
| OPENAI_API_TOKEN | The API token for accessing OpenAI services. If you are not running the local version, pleaseremember to replace it with your personal API token, which you can generate from your account on [openai.com](https:/platform.openai.com/account/api-keys). |
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 0 additions & 4 deletions examples/pipelines/unstructured/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel


class DocumentInputSchema(pw.Schema):
doc: str


class QueryInputSchema(pw.Schema):
query: str
user: str
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions examples/pipelines/unstructured_to_sql_on_the_fly/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .app import run

__all__ = ["run"]
343 changes: 343 additions & 0 deletions examples/pipelines/unstructured_to_sql_on_the_fly/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
"""
Microservice for accounting assistant.
The aim of this project is to extract and structure the data out of unstructured data (PDFs, queries)
on the fly.
The following program reads in a collection of financial PDF documents from a local directory
(that can be synchronized with a Dropbox account), tokenizes each document using the tiktoken encoding,
then extracts, using the OpenAI API, the wanted fields.
The values are stored in a Pathway table which is then output to a postgreSQL instance.
The program then starts a REST API endpoint serving queries about programming in Pathway.
Each query text is converted into a SQL query using the OpenAI API.
The diagram is available at:
https://github.com/pathwaycom/llm-app/examples/pipelines/unstructure_to_sql_on_the_fly/Unstructured_to_SQL_diagram.png
⚠️ This project requires a running postgreSQL instance.
🔵 The extracted fields from the PDFs documents are the following:
- company_symbol: str
- year: int
- quarter: str
- revenue_md: float
- eps: float
- net_income_md: float
⚠️ The revenue and net income are expressed in millions of dollars, the eps is in dollars.
🔵 The script uses a prompt to instruct the Language Model and generate SQL queries that adhere to the specified format.
The allowed queries follow a particular pattern:
1. The SELECT clause should specify columns or standard aggregator operators (SUM, COUNT, MIN, MAX, AVG).
2. The WHERE clause should include conditions using standard binary operators (<, >, =, etc.),
with support for AND and OR logic.
3. To prevent 'psycopg2.errors.GroupingError', relevant columns from the WHERE clause are included
in the GROUP BY clause.
4. For readability, if no aggregator are used, the company_symbol, year,
and quarter are included in addition of the wanted columns.
Example:
"What is the net income of all companies?" should return:
Response:
'SELECT company_symbol, net_income_md, quarter, net_income_md FROM table;'
🔵 Project architecture:
```
.
├── postgresql/
│ ├── docker-compose.yml
│ └── init-db.sql
├── ui/
│ └── server.sql
├── __init__.py
└── app.py
```
🔵 PostgreSQL:
A postgreSQL docker compose project is provided in
`examples/pipelines/unstructured_to_sql_on_the_fly/postgres/`. To run it, run:
`docker compose up -d` inside the directory.
🔵 Usage:
In the root of this repository run:
`poetry run ./run_examples.py unstructuredtosql`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/unstructured_to_sql_on_the_fly/app.py`
You can also run this example directly in the environment with llm_app installed.
To call the REST API:
curl --data '{
"user": "user",
"query": "What is the maximum quarterly revenue achieved by Apple?"
}' http://localhost:8080/ | jq
To call the Streamlit interface:
`streamlit run examples/pipelines/unstructured_to_sql_on_the_fly/ui/server.py`
🔵 Notes and TODOs:
- The project contains two distinct and non overlapping parts:
1) Extracting the data from PDFs in real time and storing the data in a postgreSQL table.
2) Transforming the query into a SQL query and then execute it.
Those could be done in two different Python files.
- TODO: data extraction needs data cleaning as it may be prone to errors. Anomaly detection
could be a nice next step to detect and possibly correct outliers.
"""
import json
import logging
import os

import pathway as pw
import psycopg2
import tiktoken
from pathway.stdlib.utils.col import unpack_col

from llm_app import extract_texts
from llm_app.model_wrappers import OpenAIChatGPTModel


class FinancialStatementSchema(pw.Schema):
company_symbol: str
year: int
quarter: str
revenue_md: float
eps: float
net_income_md: float


class NLQuerySchema(pw.Schema):
query: str
user: str


@pw.udf
def build_prompt_structure(
texts: list[str], max_tokens: int = 8000, encoding_name: str = "cl100k_base"
):
"""
Insert instructions for the LLM here.
max_tokens for the context. If gpt-3.5-turbo-16k is used, set it to 16k.
"""
docs_str = " ".join(texts)
encoding = tiktoken.get_encoding(encoding_name)
prompt_prefix = "Given the following quarterly earnings release : \n"
prompt_suffix = (
f" \nfill in this schema for the quarter in question {FinancialStatementSchema.typehints()}\n"
+ """while respecting the instructions:
- amounts should be in millions of dollars.
- Parse quarterly data and ignore yearly records if present.
- Your answer should be parseable by json. i.e. json.loads(response) doesn't throw any errors."""
)

prefix_tokens = len(list(encoding.encode_ordinary(prompt_prefix)))
suffix_tokens = len(list(encoding.encode_ordinary(prompt_suffix)))

# Calculate available tokens for docs_str
available_tokens = max_tokens - (prefix_tokens + suffix_tokens)

# Tokenize docs_str and truncate if needed
doc_tokens = list(encoding.encode_ordinary(docs_str))
if len(doc_tokens) > available_tokens:
logging.warning("Document is too large for one query.")
docs_str = encoding.decode(doc_tokens[:available_tokens])

prompt = prompt_prefix + docs_str + prompt_suffix
return prompt


@pw.udf
def build_prompt_query(postresql_table: str, query: str) -> str:
prompt = f"""Transform the given query '{query}' into a specific SQL SELECT statement format.
For invalid queries, return the string 'None'. The result should be executable in PostgreSQL.
The query should include the following components:
The SELECT clause should specify one or more columns from the table {postresql_table}.
You can use column names or standard aggregator operators such as SUM, COUNT, MIN, MAX, or AVG
to retrieve data from the columns.
The WHERE clause should include conditions that use standard binary operators (e.g., <, >, =) to filter the data.
You can use AND and OR operators to combine multiple conditions.
If any columns from the WHERE clause are used in the conditions, please ensure that those columns are included
in the GROUP BY clause to prevent the 'psycopg2.errors.GroupingError.'
You may use logical reasoning to decide which columns should be part of the GROUP BY clause.
The columns are from {postresql_table} table whose schema is:
company_symbol (str)
year (int)
quarter (str)
revenue_md (float)
eps (float)
net_income_md (float)
Quarter values are Q1, Q2, Q3 or Q4.
The company_symbol is the stock name: for example AAPL for Apple and GOOG for Google.
If no aggregator are used, please always add the company_symbol, year,
and quarter in addition of the wanted columns:
"What is the net income of all companies?" should return:
'SELECT company_symbol, net_income_md, quarter, net_income_md FROM {postresql_table};'
Please ensure that the generated SQL query follows this structure and constraints.
For example, a valid query might look like:
'SELECT company_symbol, SUM(net_income_md) FROM {postresql_table}
WHERE year = 2022 AND eps > 1.0 GROUP BY company_symbol;'
Make sure the query adheres to the specified format,
and do not include any other SQL commands or clauses besides the SELECT statement.
Thank you!"""
return prompt


@pw.udf
def parse_str_to_list(response: str) -> list:
dct = json.loads(response)
return [dct[k] for k in sorted(dct)]


def structure_on_the_fly(
documents: pw.Table,
api_key: str,
model_locator: str,
max_tokens: int,
temperature: float,
):
prompt = documents.select(prompt=build_prompt_structure(pw.this.texts))

model = OpenAIChatGPTModel(api_key=api_key)

responses = prompt.select(
result=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
)

responses = responses.select(values=parse_str_to_list(pw.this.result))
result = unpack_col(responses.values, *sorted(FinancialStatementSchema.keys()))
result = result.select(
*pw.this.without(pw.this.eps, pw.this.net_income_md, pw.this.revenue_md),
eps=pw.apply_with_type(float, float, pw.this.eps),
net_income_md=pw.apply_with_type(float, float, pw.this.net_income_md),
revenue_md=pw.apply_with_type(float, float, pw.this.revenue_md),
)
return result


def unstructured_query(
postgreSQL_settings,
postgreSQL_table,
api_key: str,
model_locator: str,
max_tokens: int,
temperature: float,
host: str,
port: int,
):
query, response_writer = pw.io.http.rest_connector(
host=host,
port=port,
schema=NLQuerySchema,
autocommit_duration_ms=50,
)

query += query.select(prompt=build_prompt_query(postgreSQL_table, pw.this.query))

model = OpenAIChatGPTModel(api_key=api_key)

query += query.select(
sql_query=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
)

# Connecting to the document database for queries
conn = psycopg2.connect(
database=postgreSQL_settings["dbname"],
host=postgreSQL_settings["host"],
user=postgreSQL_settings["user"],
password=postgreSQL_settings["password"],
port=postgreSQL_settings["port"],
)
cursor = conn.cursor()

@pw.udf
def execute_sql_query(sql_query):
cursor.execute(sql_query)
answer = cursor.fetchall()
# answer = answer[0][0]
conn.commit()
return answer

query = query.select(
pw.this.query,
pw.this.sql_query,
result=execute_sql_query(
pw.this.sql_query,
),
)
answers = query.select(result=pw.make_tuple(pw.this.sql_query, pw.this.result))
response_writer(answers)


def run(
*,
data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/q_earnings/"),
api_key: str = os.environ.get("OPENAI_API_TOKEN", ""),
host: str = "0.0.0.0",
port: int = 8080,
model_locator: str = "gpt-3.5-turbo-16k", # "gpt-4", # gpt-3.5-turbo-16k
max_tokens: int = 60,
temperature: float = 0.0,
postresql_host: str = os.environ.get("POSTGRESQL_HOST", "localhost"),
postresql_port: str = os.environ.get("POSTGRESQL_PORT", "5432"),
postresql_db: str = os.environ.get("POSTGRESQL_DB", "STRUCTUREDDB"),
postresql_user: str = os.environ.get("POSTGRESQL_USER", "user"),
postresql_password: str = os.environ.get("POSTGRESQL_PASSWORD", "password"),
postresql_table: str = os.environ.get("POSTGRESQL_TABLE", "quarterly_earnings"),
**kwargs,
):
postgreSQL_settings = {
"host": postresql_host,
"port": postresql_port,
"dbname": postresql_db,
"user": postresql_user,
"password": postresql_password,
}

files = pw.io.fs.read(
data_dir,
format="binary",
)
unstructured_documents = files.select(texts=extract_texts(pw.this.data))
structured_table = structure_on_the_fly(
unstructured_documents, api_key, model_locator, max_tokens, temperature
)
pw.io.postgres.write(structured_table, postgreSQL_settings, postresql_table)
pw.io.csv.write(structured_table, "./examples/data/quarterly_earnings.csv")

unstructured_query(
postgreSQL_settings,
postresql_table,
api_key,
model_locator,
max_tokens,
temperature,
host,
port,
)

pw.run(monitoring_level=pw.MonitoringLevel.NONE)


if __name__ == "__main__":
run()
Loading

0 comments on commit 270bb7a

Please sign in to comment.