Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit concurrency in pipeline #458

Open
Sebulba46 opened this issue Mar 6, 2025 · 0 comments
Open

Limit concurrency in pipeline #458

Sebulba46 opened this issue Mar 6, 2025 · 0 comments

Comments

@Sebulba46
Copy link

Sebulba46 commented Mar 6, 2025

I'm currently running pipeline with class Pipeline, having def init_db_connection. Which runs on startup in async def on_startup(). Is There any way to limit concurrency?

class Pipeline:
    class Valves(BaseModel):
        DB_HOST: str
        DB_PORT: str
        DB_USER: str
        DB_PASSWORD: str
        DB_DATABASE: str
        DB_TABLES: List[str]
        VLLM_HOST: str
        OPENAI_API_KEY: str
        TEXT_TO_SQL_MODEL: str

    def __init__(self):
        self.name = "01 Database RAG Pipeline vLLM llama"
        self.conn = None
        self.cur = None
        self.nlsql_response = ""

        self.valves = ...


  def init_db_connection(self):
          connection_params = {
              'dbname': self.valves.DB_DATABASE,
              'user': self.valves.DB_USER,
              'password': self.valves.DB_PASSWORD,
              'host': self.valves.DB_HOST.split('//')[-1],  # Remove the http:// or https:// prefix if present
              'port': self.valves.DB_PORT
          }
  
          try:
              self.conn = psycopg2.connect(**connection_params)
              print("Connection to PostgreSQL established successfully")
          except Exception as e:
              print(f"Error connecting to PostgreSQL: {e}")
  
          # Create a cursor object
          self.cur = self.conn.cursor()
  
          # Query to get the list of tables
          self.cur.execute("""
              SELECT table_schema, table_name
              FROM information_schema.tables
              WHERE table_type = 'BASE TABLE'
              AND table_schema NOT IN ('information_schema', 'pg_catalog');
          """)
  
          # Fetch and print the table names
          tables = self.cur.fetchall()
          print("Tables in the database:")
          for schema, table in tables:
              print(f"{schema}.{table}")
  
  
          # Query to get the column names
          self.cur.execute("""SELECT json_object_keys(to_json(json_populate_record(NULL::public.do_10vc_speed, '{}'::JSON)))""")
  
          #Fetch and print the column names
          columns = self.cur.fetchall()
          print("Columns in the database:")
          print(f"{columns}")
  
  
      async def on_startup(self):
          self.init_db_connection()
  
      async def on_shutdown(self):
          self.cur.close()
          self.conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant