From bad1d7d293c2921e761512aa08770ca201503938 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Fri, 26 Jul 2024 16:32:53 +0800 Subject: [PATCH 01/18] feat: init integration table --- .../1721982447267-add-integration-table.ts | 18 ++++++++++++ .../src/project/project.model.ts | 29 +++++++++++++++++-- apps/indexer-coordinator/src/project/types.ts | 5 ++++ 3 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts diff --git a/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts new file mode 100644 index 000000000..01c32f987 --- /dev/null +++ b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts @@ -0,0 +1,18 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddIntegrationTable1721982447267 implements MigrationInterface { + name = 'AddIntegrationTable1721982447267'; + + async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "integration_entity" ("id" SERIAL NOT NULL, "title" character varying NOT NULL, "type" integer NOT NULL, "serviceEndpoints" jsonb NOT NULL DEFAULT '{}', "enabled" boolean NOT NULL DEFAULT false, "config" jsonb NOT NULL DEFAULT '{}', "extra" jsonb NOT NULL DEFAULT '{}', CONSTRAINT "PK_f42c182dafb16de2e24566c43e6" PRIMARY KEY ("id"))` + ); + } + + async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "integration_entity"`); + } +} diff --git a/apps/indexer-coordinator/src/project/project.model.ts b/apps/indexer-coordinator/src/project/project.model.ts index 9c88fd442..c51690b79 100644 --- a/apps/indexer-coordinator/src/project/project.model.ts +++ b/apps/indexer-coordinator/src/project/project.model.ts @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { Field, ID, InputType, Int, ObjectType } from '@nestjs/graphql'; -import { Column, Entity, PrimaryColumn, BeforeInsert } from 'typeorm'; -import { AccessType, HostType, ProjectType } from './types'; +import { Column, Entity, PrimaryColumn, BeforeInsert, PrimaryGeneratedColumn } from 'typeorm'; +import { AccessType, HostType, IntegrationType, ProjectType } from './types'; // TODO: temp place to put these types @ObjectType('ProjectInfo') @@ -414,3 +414,28 @@ export class ProjectDetails extends ProjectEntity { @Field(() => Payg, { nullable: true }) payg?: Payg; } + +@Entity() +export class IntegrationEntity { + @PrimaryGeneratedColumn('increment') + id: number; + + @Column({ type: 'varchar' }) + title: string; + + @Column() + type: IntegrationType; + + @Column('jsonb', { default: {} }) + @Field(() => [SeviceEndpoint], { nullable: true }) + serviceEndpoints: SeviceEndpoint[]; + + @Column({ type: 'boolean', default: false }) + enabled: boolean; + + @Column('jsonb', { default: {} }) + config: any; + + @Column('jsonb', { default: {} }) + extra: any; +} diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 47143b5cc..6e592cdf0 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -16,6 +16,11 @@ export enum ProjectType { SUBGRAPH, } +export enum IntegrationType { + SUBGRAPH, + LLM, +} + export enum HostType { UN_RESOLVED = 'un-resolved', SYSTEM_MANAGED = 'system-managed', From 89f4fc43bcb16eca36b65c7b749466b1040df7d2 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Fri, 26 Jul 2024 17:04:24 +0800 Subject: [PATCH 02/18] refactor: add a new module --- .../src/integration/integration.model.ts | 52 +++++++++++++++++++ .../src/integration/integration.module.ts | 16 ++++++ .../src/integration/integration.resolver.ts | 16 ++++++ .../src/integration/integration.service.ts | 23 ++++++++ .../src/project/project.model.ts | 25 --------- .../src/project/project.module.ts | 4 ++ .../src/project/project.ollama.service.ts | 29 +++++++++++ .../src/project/project.service.ts | 4 ++ apps/indexer-coordinator/src/project/types.ts | 1 + 9 files changed, 145 insertions(+), 25 deletions(-) create mode 100644 apps/indexer-coordinator/src/integration/integration.model.ts create mode 100644 apps/indexer-coordinator/src/integration/integration.module.ts create mode 100644 apps/indexer-coordinator/src/integration/integration.resolver.ts create mode 100644 apps/indexer-coordinator/src/integration/integration.service.ts create mode 100644 apps/indexer-coordinator/src/project/project.ollama.service.ts diff --git a/apps/indexer-coordinator/src/integration/integration.model.ts b/apps/indexer-coordinator/src/integration/integration.model.ts new file mode 100644 index 000000000..49510bdd0 --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.model.ts @@ -0,0 +1,52 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { ID, Field, Int, ObjectType } from '@nestjs/graphql'; +import { IntegrationType } from '../project/types'; +import { + Column, + Entity, + Index, + PrimaryGeneratedColumn, + CreateDateColumn, + UpdateDateColumn, +} from 'typeorm'; +import { SeviceEndpoint } from '../project/project.model'; + +@Entity('integration') +@ObjectType() +export class IntegrationEntity { + @PrimaryGeneratedColumn('increment') + @Field(() => ID, { nullable: true }) + id: number; + + @Column({ type: 'varchar' }) + @Field() + title: string; + + @Column() + @Field() + type: IntegrationType; + + @Column('jsonb', { default: {} }) + @Field(() => [SeviceEndpoint], { nullable: true }) + serviceEndpoints: SeviceEndpoint[]; + + @Column({ type: 'boolean', default: false }) + @Field() + enabled: boolean; + + @Column('jsonb', { default: {} }) + config: any; + + @Column('jsonb', { default: {} }) + extra: any; + + @CreateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' }) + @Field({ nullable: true }) + created_at: Date; + + @UpdateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' }) + @Field({ nullable: true }) + updated_at: Date; +} diff --git a/apps/indexer-coordinator/src/integration/integration.module.ts b/apps/indexer-coordinator/src/integration/integration.module.ts new file mode 100644 index 000000000..46724e718 --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.module.ts @@ -0,0 +1,16 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { IntegrationEntity } from './integration.model'; +import { IntegrationResolver } from './integration.resolver'; +import { IntegrationService } from './integration.service'; + +@Module({ + imports: [TypeOrmModule.forFeature([IntegrationEntity])], + providers: [IntegrationService, IntegrationResolver], + exports: [IntegrationService], +}) +@Module({}) +export class IntegrationModule {} diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts new file mode 100644 index 000000000..62aad6a9a --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -0,0 +1,16 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; +import { IntegrationEntity } from './integration.model'; +import { IntegrationService } from './integration.service'; + +@Resolver(() => IntegrationEntity) +export class IntegrationResolver { + constructor(private integrationService: IntegrationService) {} + + @Query(() => [IntegrationEntity]) + async allIntegration(): Promise { + return await this.integrationService.getAll(); + } +} diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts new file mode 100644 index 000000000..1dd2f8560 --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -0,0 +1,23 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, In } from 'typeorm'; +import { IntegrationEntity } from './integration.model'; + +@Injectable() +export class IntegrationService { + constructor( + @InjectRepository(IntegrationEntity) + private integrationRepo: Repository + ) {} + + async get(id: number): Promise { + return this.integrationRepo.findOne({ where: { id } }); + } + + async getAll(): Promise { + return this.integrationRepo.find(); + } +} diff --git a/apps/indexer-coordinator/src/project/project.model.ts b/apps/indexer-coordinator/src/project/project.model.ts index c51690b79..d45d249cd 100644 --- a/apps/indexer-coordinator/src/project/project.model.ts +++ b/apps/indexer-coordinator/src/project/project.model.ts @@ -414,28 +414,3 @@ export class ProjectDetails extends ProjectEntity { @Field(() => Payg, { nullable: true }) payg?: Payg; } - -@Entity() -export class IntegrationEntity { - @PrimaryGeneratedColumn('increment') - id: number; - - @Column({ type: 'varchar' }) - title: string; - - @Column() - type: IntegrationType; - - @Column('jsonb', { default: {} }) - @Field(() => [SeviceEndpoint], { nullable: true }) - serviceEndpoints: SeviceEndpoint[]; - - @Column({ type: 'boolean', default: false }) - enabled: boolean; - - @Column('jsonb', { default: {} }) - config: any; - - @Column('jsonb', { default: {} }) - extra: any; -} diff --git a/apps/indexer-coordinator/src/project/project.module.ts b/apps/indexer-coordinator/src/project/project.module.ts index 42b95843f..392984d99 100644 --- a/apps/indexer-coordinator/src/project/project.module.ts +++ b/apps/indexer-coordinator/src/project/project.module.ts @@ -17,6 +17,8 @@ import { ProjectResolver } from './project.resolver'; import { ProjectRpcService } from './project.rpc.service'; import { ProjectService } from './project.service'; import { ProjectSubgraphService } from './project.subgraph.service'; +import { ProjectOllamaService } from './project.ollama.service'; +import { IntegrationModule } from 'src/integration/integration.module'; @Module({ imports: [ @@ -26,6 +28,7 @@ import { ProjectSubgraphService } from './project.subgraph.service'; MetricsModule, TypeOrmModule.forFeature([ProjectEntity, PaygEntity]), ConfigModule, + IntegrationModule, ], providers: [ ProjectService, @@ -33,6 +36,7 @@ import { ProjectSubgraphService } from './project.subgraph.service'; ProjectResolver, ProjectRpcService, ProjectSubgraphService, + ProjectOllamaService, DbStatsService, ], exports: [ProjectService, ProjectRpcService], diff --git a/apps/indexer-coordinator/src/project/project.ollama.service.ts b/apps/indexer-coordinator/src/project/project.ollama.service.ts new file mode 100644 index 000000000..449ba407d --- /dev/null +++ b/apps/indexer-coordinator/src/project/project.ollama.service.ts @@ -0,0 +1,29 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { DesiredStatus } from 'src/core/types'; +import { getLogger } from 'src/utils/logger'; +import { getDomain, getIpAddress, isIp, isPrivateIp } from 'src/utils/network'; +import { Repository } from 'typeorm'; +import { RpcManifest } from './project.manifest'; +import { + IProjectConfig, + MetadataType, + Project, + ProjectEntity, + SeviceEndpoint, + ValidationResponse, +} from './project.model'; +import { ProjectService } from './project.service'; +import { RequiredRpcType, getRpcFamilyObject } from './rpc.factory'; +import { AccessType, ProjectType } from './types'; + +const logger = getLogger('project.ollama.service'); + +@Injectable() +export class ProjectOllamaService { + constructor() {} +} diff --git a/apps/indexer-coordinator/src/project/project.service.ts b/apps/indexer-coordinator/src/project/project.service.ts index 1731558be..df0561541 100644 --- a/apps/indexer-coordinator/src/project/project.service.ts +++ b/apps/indexer-coordinator/src/project/project.service.ts @@ -260,6 +260,10 @@ export class ProjectService { manifest, }); + if(manifest?.kind === 'LLM') { + return this.projectRepo.save(projectEntity); + } + // load default payg config const flexConfig = await this.configService.getFlexConfig(); diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 6e592cdf0..61fbfb253 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -14,6 +14,7 @@ export enum ProjectType { RPC, DICTIONARY, SUBGRAPH, + LLM, } export enum IntegrationType { From 5ac76b9d744fe16030313002ac9c8525e086ff9a Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Fri, 26 Jul 2024 17:37:23 +0800 Subject: [PATCH 03/18] refactor: refactor --- .../src/integration/integration.model.ts | 2 +- .../src/integration/integration.resolver.ts | 11 +++++++++++ .../src/integration/integration.service.ts | 17 +++++++++++++++++ ...ollama.service.ts => project.llm.service.ts} | 15 +++++++++++++-- .../src/project/project.module.ts | 4 ++-- .../src/project/project.resolver.ts | 4 ++++ .../src/project/project.service.ts | 4 ---- 7 files changed, 48 insertions(+), 9 deletions(-) rename apps/indexer-coordinator/src/project/{project.ollama.service.ts => project.llm.service.ts} (74%) diff --git a/apps/indexer-coordinator/src/integration/integration.model.ts b/apps/indexer-coordinator/src/integration/integration.model.ts index 49510bdd0..29b184a22 100644 --- a/apps/indexer-coordinator/src/integration/integration.model.ts +++ b/apps/indexer-coordinator/src/integration/integration.model.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 import { ID, Field, Int, ObjectType } from '@nestjs/graphql'; -import { IntegrationType } from '../project/types'; import { Column, Entity, @@ -12,6 +11,7 @@ import { UpdateDateColumn, } from 'typeorm'; import { SeviceEndpoint } from '../project/project.model'; +import { IntegrationType } from '../project/types'; @Entity('integration') @ObjectType() diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts index 62aad6a9a..a45b06100 100644 --- a/apps/indexer-coordinator/src/integration/integration.resolver.ts +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; +import { SeviceEndpoint } from '../project/project.model'; +import { IntegrationType } from '../project/types'; import { IntegrationEntity } from './integration.model'; import { IntegrationService } from './integration.service'; @@ -13,4 +15,13 @@ export class IntegrationResolver { async allIntegration(): Promise { return await this.integrationService.getAll(); } + + @Mutation(() => IntegrationEntity) + addIntegration( + @Args('title') title: string, + @Args('type') type: IntegrationType, + @Args('serviceEndpoints') serviceEndpoints: SeviceEndpoint[] + ): Promise { + return this.integrationService.create(title, type, serviceEndpoints); + } } diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts index 1dd2f8560..bf5b64891 100644 --- a/apps/indexer-coordinator/src/integration/integration.service.ts +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -4,6 +4,8 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; +import { SeviceEndpoint } from '../project/project.model'; +import { IntegrationType } from '../project/types'; import { IntegrationEntity } from './integration.model'; @Injectable() @@ -20,4 +22,19 @@ export class IntegrationService { async getAll(): Promise { return this.integrationRepo.find(); } + + async create( + title: string, + type: IntegrationType, + serviceEndpoints: SeviceEndpoint[] + ): Promise { + // todo: check if title exists + const integration = new IntegrationEntity(); + integration.title = title; + integration.type = type; + integration.serviceEndpoints = serviceEndpoints; + + // todo: check if serviceEndpoints are valid + return this.integrationRepo.save(integration); + } } diff --git a/apps/indexer-coordinator/src/project/project.ollama.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts similarity index 74% rename from apps/indexer-coordinator/src/project/project.ollama.service.ts rename to apps/indexer-coordinator/src/project/project.llm.service.ts index 449ba407d..6eb5a14b2 100644 --- a/apps/indexer-coordinator/src/project/project.ollama.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -21,9 +21,20 @@ import { ProjectService } from './project.service'; import { RequiredRpcType, getRpcFamilyObject } from './rpc.factory'; import { AccessType, ProjectType } from './types'; -const logger = getLogger('project.ollama.service'); +const logger = getLogger('project.llm.service'); @Injectable() -export class ProjectOllamaService { +export class ProjectLLMService { constructor() {} + + async startLLMProject( + id: string, + projectConfig: IProjectConfig, + rateLimit: number + ): Promise { + // check ollama model exists + + // ollama run + return new Project(); + } } diff --git a/apps/indexer-coordinator/src/project/project.module.ts b/apps/indexer-coordinator/src/project/project.module.ts index 392984d99..e46dbdae6 100644 --- a/apps/indexer-coordinator/src/project/project.module.ts +++ b/apps/indexer-coordinator/src/project/project.module.ts @@ -17,7 +17,7 @@ import { ProjectResolver } from './project.resolver'; import { ProjectRpcService } from './project.rpc.service'; import { ProjectService } from './project.service'; import { ProjectSubgraphService } from './project.subgraph.service'; -import { ProjectOllamaService } from './project.ollama.service'; +import { ProjectLLMService } from './project.llm.service'; import { IntegrationModule } from 'src/integration/integration.module'; @Module({ @@ -36,7 +36,7 @@ import { IntegrationModule } from 'src/integration/integration.module'; ProjectResolver, ProjectRpcService, ProjectSubgraphService, - ProjectOllamaService, + ProjectLLMService, DbStatsService, ], exports: [ProjectService, ProjectRpcService], diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index 3079e3cd8..722aa37d2 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -8,6 +8,7 @@ import { QueryService } from '../core/query.service'; import { SubscriptionService } from '../subscription/subscription.service'; import { ProjectEvent } from '../utils/subscription'; import { DbStatsService } from './db.stats.service'; +import { ProjectLLMService } from './project.llm.service'; import { AggregatedManifest, RpcManifest, SubgraphManifest } from './project.manifest'; import { LogType, @@ -42,6 +43,7 @@ export class ProjectResolver { private projectService: ProjectService, private projectRpcService: ProjectRpcService, private projectSubgraphService: ProjectSubgraphService, + private projectLLMService: ProjectLLMService, private queryService: QueryService, private dockerRegistry: DockerRegistryService, private pubSub: SubscriptionService, @@ -286,6 +288,8 @@ export class ProjectResolver { return this.projectRpcService.startRpcProject(id, projectConfig, rateLimit ?? 0); case ProjectType.SUBGRAPH: return this.projectSubgraphService.startSubgraphProject(id, projectConfig, rateLimit ?? 0); + case ProjectType.LLM: + return this.projectLLMService.startLLMProject(id, projectConfig, rateLimit ?? 0); default: throw new Error(`Unknown project type ${projectType}`); } diff --git a/apps/indexer-coordinator/src/project/project.service.ts b/apps/indexer-coordinator/src/project/project.service.ts index df0561541..1731558be 100644 --- a/apps/indexer-coordinator/src/project/project.service.ts +++ b/apps/indexer-coordinator/src/project/project.service.ts @@ -260,10 +260,6 @@ export class ProjectService { manifest, }); - if(manifest?.kind === 'LLM') { - return this.projectRepo.save(projectEntity); - } - // load default payg config const flexConfig = await this.configService.getFlexConfig(); From e3b8dd6cb02041259cd8efc196bcab8cdb422b19 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Mon, 29 Jul 2024 16:25:32 +0800 Subject: [PATCH 04/18] feat: start llm project --- .../src/integration/integration.model.ts | 5 +- .../src/integration/integration.module.ts | 6 +- .../src/integration/integration.resolver.ts | 23 +++++- .../src/integration/integration.service.ts | 78 +++++++++++++++++-- .../1721982447267-add-integration-table.ts | 8 +- .../src/project/project.llm.service.ts | 66 +++++++++++----- .../src/project/project.manifest.ts | 40 ++++++++++ .../src/project/project.module.ts | 4 +- .../src/project/project.resolver.ts | 7 +- apps/indexer-coordinator/src/project/types.ts | 18 +++++ .../src/utils/subscription.ts | 4 + 11 files changed, 221 insertions(+), 38 deletions(-) diff --git a/apps/indexer-coordinator/src/integration/integration.model.ts b/apps/indexer-coordinator/src/integration/integration.model.ts index 29b184a22..37cffacf6 100644 --- a/apps/indexer-coordinator/src/integration/integration.model.ts +++ b/apps/indexer-coordinator/src/integration/integration.model.ts @@ -1,7 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: Apache-2.0 -import { ID, Field, Int, ObjectType } from '@nestjs/graphql'; +import { ID, Field, ObjectType } from '@nestjs/graphql'; import { Column, Entity, @@ -14,13 +14,14 @@ import { SeviceEndpoint } from '../project/project.model'; import { IntegrationType } from '../project/types'; @Entity('integration') +@Index(['title'], { unique: true }) @ObjectType() export class IntegrationEntity { @PrimaryGeneratedColumn('increment') @Field(() => ID, { nullable: true }) id: number; - @Column({ type: 'varchar' }) + @Column({ type: 'varchar', length: 50 }) @Field() title: string; diff --git a/apps/indexer-coordinator/src/integration/integration.module.ts b/apps/indexer-coordinator/src/integration/integration.module.ts index 46724e718..56359d3ef 100644 --- a/apps/indexer-coordinator/src/integration/integration.module.ts +++ b/apps/indexer-coordinator/src/integration/integration.module.ts @@ -3,12 +3,16 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; +import { ProjectModule } from '../project/project.module'; import { IntegrationEntity } from './integration.model'; import { IntegrationResolver } from './integration.resolver'; import { IntegrationService } from './integration.service'; @Module({ - imports: [TypeOrmModule.forFeature([IntegrationEntity])], + imports: [ + TypeOrmModule.forFeature([IntegrationEntity]), + ProjectModule + ], providers: [IntegrationService, IntegrationResolver], exports: [IntegrationService], }) diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts index a45b06100..bf8f7f6bc 100644 --- a/apps/indexer-coordinator/src/integration/integration.resolver.ts +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -3,7 +3,7 @@ import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { SeviceEndpoint } from '../project/project.model'; -import { IntegrationType } from '../project/types'; +import { IntegrationType, LLMConfig, LLMExtra } from '../project/types'; import { IntegrationEntity } from './integration.model'; import { IntegrationService } from './integration.service'; @@ -20,8 +20,25 @@ export class IntegrationResolver { addIntegration( @Args('title') title: string, @Args('type') type: IntegrationType, - @Args('serviceEndpoints') serviceEndpoints: SeviceEndpoint[] + @Args('serviceEndpoints', { type: () => [SeviceEndpoint] }) + serviceEndpoints: SeviceEndpoint[], + + @Args('config', { nullable: true }) config?: LLMConfig, + @Args('extra', { nullable: true }) extra?: LLMExtra + ): Promise { + return this.integrationService.create(title, type, serviceEndpoints, config, extra); + } + + @Mutation(() => IntegrationEntity) + updateIntegration( + @Args('id') id: number, + @Args('title') title: string, + @Args('serviceEndpoints', { type: () => [SeviceEndpoint] }) + serviceEndpoints: SeviceEndpoint[], + @Args('enabled') enabled: boolean, + @Args('config', { nullable: true }) config?: LLMConfig, + @Args('extra', { nullable: true }) extra?: LLMExtra ): Promise { - return this.integrationService.create(title, type, serviceEndpoints); + return this.integrationService.update(id, title, serviceEndpoints, enabled, config, extra); } } diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts index bf5b64891..ffb5042da 100644 --- a/apps/indexer-coordinator/src/integration/integration.service.ts +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -3,16 +3,18 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Repository, In } from 'typeorm'; -import { SeviceEndpoint } from '../project/project.model'; -import { IntegrationType } from '../project/types'; +import { Repository } from 'typeorm'; +import { ProjectLLMService } from '../project/project.llm.service'; +import { SeviceEndpoint, ValidationResponse } from '../project/project.model'; +import { IntegrationType, LLMConfig, LLMExtra } from '../project/types'; import { IntegrationEntity } from './integration.model'; @Injectable() export class IntegrationService { constructor( @InjectRepository(IntegrationEntity) - private integrationRepo: Repository + private integrationRepo: Repository, + private projectLLMService: ProjectLLMService ) {} async get(id: number): Promise { @@ -26,15 +28,75 @@ export class IntegrationService { async create( title: string, type: IntegrationType, - serviceEndpoints: SeviceEndpoint[] + serviceEndpoints: SeviceEndpoint[], + config?: LLMConfig, + extra?: LLMExtra ): Promise { - // todo: check if title exists - const integration = new IntegrationEntity(); + let integration = await this.integrationRepo.findOne({ where: { title } }); + if (integration) { + throw new Error(`${title} already exist`); + } + + let validateResult: ValidationResponse = { valid: true, reason: '' }; + switch (type) { + case IntegrationType.LLM: + validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value); + break; + default: + throw new Error('Unsupported integration type'); + } + + if (!validateResult.valid) { + throw new Error(validateResult.reason); + } + + integration = new IntegrationEntity(); integration.title = title; integration.type = type; + integration.enabled = true; integration.serviceEndpoints = serviceEndpoints; + integration.config = config || {}; + integration.extra = extra || {}; + return this.integrationRepo.save(integration); + } - // todo: check if serviceEndpoints are valid + async update( + id: number, + title: string, + serviceEndpoints: SeviceEndpoint[], + enabled: boolean, + config?: LLMConfig, + extra?: LLMExtra + ): Promise { + let integration = await this.integrationRepo.findOne({ where: { title } }); + if (integration && integration.id !== id) { + throw new Error(`${title} already exist`); + } + + integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + + let validateResult: ValidationResponse = { valid: true, reason: '' }; + switch (integration.type) { + case IntegrationType.LLM: + validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value); + break; + default: + throw new Error('Unsupported integration type'); + } + + if (!validateResult.valid) { + throw new Error(validateResult.reason); + } + + integration.title = title; + integration.enabled = true; + integration.serviceEndpoints = serviceEndpoints; + integration.enabled = enabled; + integration.config = config || {}; + integration.extra = extra || {}; return this.integrationRepo.save(integration); } } diff --git a/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts index 01c32f987..4df5d594b 100644 --- a/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts +++ b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts @@ -8,11 +8,15 @@ export class AddIntegrationTable1721982447267 implements MigrationInterface { async up(queryRunner: QueryRunner): Promise { await queryRunner.query( - `CREATE TABLE "integration_entity" ("id" SERIAL NOT NULL, "title" character varying NOT NULL, "type" integer NOT NULL, "serviceEndpoints" jsonb NOT NULL DEFAULT '{}', "enabled" boolean NOT NULL DEFAULT false, "config" jsonb NOT NULL DEFAULT '{}', "extra" jsonb NOT NULL DEFAULT '{}', CONSTRAINT "PK_f42c182dafb16de2e24566c43e6" PRIMARY KEY ("id"))` + `CREATE TABLE "integration" ("id" SERIAL NOT NULL, "title" character varying(50) NOT NULL, "type" integer NOT NULL, "serviceEndpoints" jsonb NOT NULL DEFAULT '{}', "enabled" boolean NOT NULL DEFAULT false, "config" jsonb NOT NULL DEFAULT '{}', "extra" jsonb NOT NULL DEFAULT '{}', "created_at" TIMESTAMP NOT NULL DEFAULT now(), "updated_at" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_f348d4694945d9dc4c7049a178a" PRIMARY KEY ("id"))` + ); + await queryRunner.query( + `CREATE UNIQUE INDEX "IDX_814dc61a29c5383dc90993603f" ON "integration" ("title") ` ); } async down(queryRunner: QueryRunner): Promise { - await queryRunner.query(`DROP TABLE "integration_entity"`); + await queryRunner.query(`DROP INDEX "public"."IDX_814dc61a29c5383dc90993603f"`); + await queryRunner.query(`DROP TABLE "integration"`); } } diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index 6eb5a14b2..fd1c8c246 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -2,39 +2,69 @@ // SPDX-License-Identifier: Apache-2.0 import { Injectable } from '@nestjs/common'; -import { Cron } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; -import { DesiredStatus } from 'src/core/types'; +import { Ollama } from 'ollama'; import { getLogger } from 'src/utils/logger'; -import { getDomain, getIpAddress, isIp, isPrivateIp } from 'src/utils/network'; +import { OllamaEvent } from 'src/utils/subscription'; import { Repository } from 'typeorm'; -import { RpcManifest } from './project.manifest'; -import { - IProjectConfig, - MetadataType, - Project, - ProjectEntity, - SeviceEndpoint, - ValidationResponse, -} from './project.model'; +import { SubscriptionService } from '../subscription/subscription.service'; +import { LLMManifest } from './project.manifest'; +import { IProjectConfig, Project, ProjectEntity, ValidationResponse } from './project.model'; import { ProjectService } from './project.service'; -import { RequiredRpcType, getRpcFamilyObject } from './rpc.factory'; -import { AccessType, ProjectType } from './types'; const logger = getLogger('project.llm.service'); @Injectable() export class ProjectLLMService { - constructor() {} + constructor( + @InjectRepository(ProjectEntity) private projectRepo: Repository, + private projectService: ProjectService, + private pubSub: SubscriptionService + ) {} async startLLMProject( id: string, projectConfig: IProjectConfig, rateLimit: number ): Promise { - // check ollama model exists + let project = await this.projectService.getProject(id); + if (!project) { + project = await this.projectService.addProject(id); + } + const endpoints = projectConfig.serviceEndpoints; + const host = endpoints[0].value; - // ollama run - return new Project(); + const manifest = project.manifest as LLMManifest; + const targetModel = manifest.model.name; + + try { + const ollama = new Ollama({ host }); + const allModels = await ollama.list(); + + const model = allModels?.models?.find((m) => m.name === targetModel); + if (!model) { + ollama.pull({ model: targetModel, stream: true }).then(async (stream) => { + for await (const part of stream) { + // console.log(part); + this.pubSub.publish(OllamaEvent.PullProgress, part); + } + }); + } + } catch (err) { + logger.error(`validate llm host: ${host} failed: ${err.message}`); + throw new Error(`Failed to start LLM project: ${err.message}`); + } + return project; + } + + async validate(host): Promise { + try { + const ollama = new Ollama({ host }); + await ollama.list(); + return { valid: true, reason: '' }; + } catch (err) { + logger.error(`validate llm host: ${host} failed: ${err.message}`); + return { valid: false, reason: err.message }; + } } } diff --git a/apps/indexer-coordinator/src/project/project.manifest.ts b/apps/indexer-coordinator/src/project/project.manifest.ts index b12255ee2..1d2cc2c8c 100644 --- a/apps/indexer-coordinator/src/project/project.manifest.ts +++ b/apps/indexer-coordinator/src/project/project.manifest.ts @@ -95,6 +95,46 @@ export class SubgraphManifest { computeUnit?: ComputeUnitClass[]; } +@ObjectType('ModelClass') +class ModelClass { + @Field(() => String, { nullable: true }) + name?: string; + @Field(() => String, { nullable: true }) + file?: string; +} + +@ObjectType('ParameterClass') +class ParameterClass { + @Field(() => Number, { nullable: true }) + temperature?: number; + @Field(() => Number, { nullable: true }) + num_ctx?: number; +} + +@ObjectType('RunnerClass') +class RunnerClass { + @Field(() => String, { nullable: true }) + name?: string; + @Field(() => ParameterClass, { nullable: true }) + parameter?: ParameterClass; + @Field(() => String, { nullable: true }) + system?: string; +} + +@ObjectType('LLMManifest') +export class LLMManifest { + @Field(() => String, { nullable: true }) + kind?: string; + @Field(() => String, { nullable: true }) + specVersion?: string; + + @Field(() => ModelClass, { nullable: true }) + model: ModelClass; + + @Field(() => RunnerClass, { nullable: true }) + runner?: RunnerClass; +} + @ObjectType('AggregatedManifest') export class AggregatedManifest { @Field(() => SubqueryManifest, { nullable: true }) diff --git a/apps/indexer-coordinator/src/project/project.module.ts b/apps/indexer-coordinator/src/project/project.module.ts index e46dbdae6..72a1a9d45 100644 --- a/apps/indexer-coordinator/src/project/project.module.ts +++ b/apps/indexer-coordinator/src/project/project.module.ts @@ -18,7 +18,6 @@ import { ProjectRpcService } from './project.rpc.service'; import { ProjectService } from './project.service'; import { ProjectSubgraphService } from './project.subgraph.service'; import { ProjectLLMService } from './project.llm.service'; -import { IntegrationModule } from 'src/integration/integration.module'; @Module({ imports: [ @@ -28,7 +27,6 @@ import { IntegrationModule } from 'src/integration/integration.module'; MetricsModule, TypeOrmModule.forFeature([ProjectEntity, PaygEntity]), ConfigModule, - IntegrationModule, ], providers: [ ProjectService, @@ -39,6 +37,6 @@ import { IntegrationModule } from 'src/integration/integration.module'; ProjectLLMService, DbStatsService, ], - exports: [ProjectService, ProjectRpcService], + exports: [ProjectService, ProjectRpcService, ProjectLLMService], }) export class ProjectModule {} diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index 722aa37d2..15636b766 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -6,7 +6,7 @@ import { Args, Mutation, Query, Resolver, Subscription } from '@nestjs/graphql'; import { DockerRegistry, DockerRegistryService } from '../core/docker.registry.service'; import { QueryService } from '../core/query.service'; import { SubscriptionService } from '../subscription/subscription.service'; -import { ProjectEvent } from '../utils/subscription'; +import { OllamaEvent, ProjectEvent } from '../utils/subscription'; import { DbStatsService } from './db.stats.service'; import { ProjectLLMService } from './project.llm.service'; import { AggregatedManifest, RpcManifest, SubgraphManifest } from './project.manifest'; @@ -369,4 +369,9 @@ export class ProjectResolver { async getProjectDbSize(@Args('id') id: string): Promise { return (await this.dbStatsService.getProjectDbStats(id)).size || '0'; } + + @Subscription(() => String) + progressChanged() { + return this.pubSub.asyncIterator(OllamaEvent.PullProgress); + } } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 61fbfb253..c717f12f4 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -22,6 +22,20 @@ export enum IntegrationType { LLM, } +@InputType() +@ObjectType() +export class LLMConfig { + @Field() + foo: string; +} + +@InputType() +@ObjectType() +export class LLMExtra { + @Field() + bar: string; +} + export enum HostType { UN_RESOLVED = 'un-resolved', SYSTEM_MANAGED = 'system-managed', @@ -66,6 +80,10 @@ export enum SubgraphEndpointType { MetricsEndpoint = 'metrics-endpoint', } +export enum OllamaEndpointType { + HttpEndpoint = 'http-endpoint', +} + export const SubgraphEndpointAccessType = { [SubgraphEndpointType.HttpEndpoint]: AccessType.DEFAULT, [SubgraphEndpointType.WsEndpoint]: AccessType.DEFAULT, diff --git a/apps/indexer-coordinator/src/utils/subscription.ts b/apps/indexer-coordinator/src/utils/subscription.ts index 7b83001d5..899a6c8b2 100644 --- a/apps/indexer-coordinator/src/utils/subscription.ts +++ b/apps/indexer-coordinator/src/utils/subscription.ts @@ -16,3 +16,7 @@ export enum AccountEvent { Indexer = 'account_indexer', Controller = 'account_controller', } + +export enum OllamaEvent { + PullProgress = 'pull_progress', +} \ No newline at end of file From 278b1dcde150feece15aab5bf0232fb60a5663b8 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Mon, 29 Jul 2024 17:08:12 +0800 Subject: [PATCH 05/18] feat: stop/remove llm project --- .../src/project/project.llm.service.ts | 25 +++++++++++++++++++ .../src/project/project.resolver.ts | 4 +++ 2 files changed, 29 insertions(+) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index fd1c8c246..f7cfbaaf0 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -11,6 +11,7 @@ import { SubscriptionService } from '../subscription/subscription.service'; import { LLMManifest } from './project.manifest'; import { IProjectConfig, Project, ProjectEntity, ValidationResponse } from './project.model'; import { ProjectService } from './project.service'; +import { DesiredStatus } from 'src/core/types'; const logger = getLogger('project.llm.service'); @@ -57,6 +58,30 @@ export class ProjectLLMService { return project; } + async stopLLMProject(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) { + return; + } + project.status = DesiredStatus.STOPPED; + return this.projectRepo.save(project); + } + + async removeLLMProject(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) return []; + const endpoints = project.serviceEndpoints; + const host = endpoints[0].value; + + const manifest = project.manifest as LLMManifest; + const targetModel = manifest.model.name; + + const ollama = new Ollama({ host }); + await ollama.delete({ model: targetModel }); + + return this.projectRepo.remove([project]); + } + async validate(host): Promise { try { const ollama = new Ollama({ host }); diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index 15636b766..fe767058b 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -310,6 +310,8 @@ export class ProjectResolver { return this.projectRpcService.stopRpcProject(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.stopSubgraphProject(id); + case ProjectType.LLM: + return this.projectLLMService.stopLLMProject(id); default: throw new Error(`Unknown project type ${projectType}`); } @@ -330,6 +332,8 @@ export class ProjectResolver { return this.projectRpcService.removeRpcProject(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.removeSubgraphProject(id); + case ProjectType.LLM: + return this.projectLLMService.removeLLMProject(id); default: throw new Error(`Unknown project type ${projectType}`); } From 07f7d6b43e6960282d7bb59030dc68d20d222abf Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Mon, 29 Jul 2024 17:14:09 +0800 Subject: [PATCH 06/18] feat: add module --- apps/indexer-coordinator/src/app.module.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/indexer-coordinator/src/app.module.ts b/apps/indexer-coordinator/src/app.module.ts index dcaf10da7..d4bc7cbfb 100644 --- a/apps/indexer-coordinator/src/app.module.ts +++ b/apps/indexer-coordinator/src/app.module.ts @@ -28,6 +28,7 @@ import { ProjectModule } from './project/project.module'; import { RewardModule } from './reward/reward.module'; import { StatsModule } from './stats/stats.module'; import { SubscriptionModule } from './subscription/subscription.module'; +import { IntegrationModule } from './integration/integration.module'; @Module({ imports: [ @@ -81,6 +82,7 @@ import { SubscriptionModule } from './subscription/subscription.module'; NetworkModule, RewardModule, ConfigModule, + IntegrationModule, ], controllers: [AdminController, AgreementController, MonitorController], }) From c409259c549799fa4526f5b84ed3fb41b8ee446d Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Tue, 30 Jul 2024 09:29:20 +0800 Subject: [PATCH 07/18] feat: pull status --- .../src/project/project.llm.service.ts | 36 +++++++++++++++++-- apps/indexer-coordinator/src/project/types.ts | 8 +++-- apps/indexer-coordinator/yarn.lock | 12 +++++++ 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index f7cfbaaf0..a55d410a4 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -9,9 +9,16 @@ import { OllamaEvent } from 'src/utils/subscription'; import { Repository } from 'typeorm'; import { SubscriptionService } from '../subscription/subscription.service'; import { LLMManifest } from './project.manifest'; -import { IProjectConfig, Project, ProjectEntity, ValidationResponse } from './project.model'; +import { + IProjectConfig, + Project, + ProjectEntity, + SeviceEndpoint, + ValidationResponse, +} from './project.model'; import { ProjectService } from './project.service'; import { DesiredStatus } from 'src/core/types'; +import { LLMEndpointAccessType, LLMEndpointType, SubqueryEndpointType } from './types'; const logger = getLogger('project.llm.service'); @@ -32,6 +39,10 @@ export class ProjectLLMService { if (!project) { project = await this.projectService.addProject(id); } + if (project.rateLimit !== rateLimit) { + project.rateLimit = rateLimit; + } + const endpoints = projectConfig.serviceEndpoints; const host = endpoints[0].value; @@ -44,18 +55,32 @@ export class ProjectLLMService { const model = allModels?.models?.find((m) => m.name === targetModel); if (!model) { + project.status = DesiredStatus.PULLING; + ollama.pull({ model: targetModel, stream: true }).then(async (stream) => { for await (const part of stream) { // console.log(part); this.pubSub.publish(OllamaEvent.PullProgress, part); } + project.status = DesiredStatus.RUNNING; + await this.projectRepo.save(project); }); } + + project.projectConfig = projectConfig; + project.serviceEndpoints = [ + new SeviceEndpoint( + LLMEndpointType.ApiGenerateEndpoint, + this.nodeEndpoint(host, '/v1/chat/completions'), + LLMEndpointAccessType[LLMEndpointType.ApiGenerateEndpoint] + ), + ]; } catch (err) { - logger.error(`validate llm host: ${host} failed: ${err.message}`); + logger.error(`Failed to start LLM project host: ${host} failed: ${err.message}`); throw new Error(`Failed to start LLM project: ${err.message}`); } - return project; + + return await this.projectRepo.save(project); } async stopLLMProject(id: string): Promise { @@ -92,4 +117,9 @@ export class ProjectLLMService { return { valid: false, reason: err.message }; } } + + nodeEndpoint(host: string, input: string): string { + const url = new URL(input, host); + return url.toString(); + } } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index c717f12f4..0d6514499 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -80,8 +80,8 @@ export enum SubgraphEndpointType { MetricsEndpoint = 'metrics-endpoint', } -export enum OllamaEndpointType { - HttpEndpoint = 'http-endpoint', +export enum LLMEndpointType { + ApiGenerateEndpoint = 'api-generate-endpoint', } export const SubgraphEndpointAccessType = { @@ -92,6 +92,10 @@ export const SubgraphEndpointAccessType = { [SubgraphEndpointType.MetricsEndpoint]: AccessType.INTERNAL, }; +export const LLMEndpointAccessType = { + [LLMEndpointType.ApiGenerateEndpoint]: AccessType.DEFAULT, +}; + @InputType('SubgraphPort') @ObjectType('SubgraphPort') export class SubgraphPort { diff --git a/apps/indexer-coordinator/yarn.lock b/apps/indexer-coordinator/yarn.lock index 9cc7ec074..c95c2095d 100644 --- a/apps/indexer-coordinator/yarn.lock +++ b/apps/indexer-coordinator/yarn.lock @@ -6784,6 +6784,13 @@ obuf@~1.1.2: resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" integrity sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg== +ollama@^0.5.6: + version "0.5.6" + resolved "https://registry.yarnpkg.com/ollama/-/ollama-0.5.6.tgz#351bd08aca7a7c3a4f3aba710dfbbefa784be1e6" + integrity sha512-4BySAMt96+OCt4emL6DE78UBCGqC7GvteM9LRCd6WwJyefn0x9w2BrcUcLm9nJ9bYpRsmkhf0Au18Q5MhsA14w== + dependencies: + whatwg-fetch "^3.6.20" + on-exit-leak-free@^0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/on-exit-leak-free/-/on-exit-leak-free-0.2.0.tgz#b39c9e3bf7690d890f4861558b0d7b90a442d209" @@ -8814,6 +8821,11 @@ whatwg-encoding@^1.0.5: dependencies: iconv-lite "0.4.24" +whatwg-fetch@^3.6.20: + version "3.6.20" + resolved "https://registry.yarnpkg.com/whatwg-fetch/-/whatwg-fetch-3.6.20.tgz#580ce6d791facec91d37c72890995a0b48d31c70" + integrity sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg== + whatwg-mimetype@^2.3.0: version "2.3.0" resolved "https://registry.yarnpkg.com/whatwg-mimetype/-/whatwg-mimetype-2.3.0.tgz#3d4b1e0312d2079879f826aff18dbeeca5960fbf" From 39cd664520ffc8204769343bb9a09888b829b969 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Tue, 30 Jul 2024 09:29:53 +0800 Subject: [PATCH 08/18] feat: status --- apps/indexer-coordinator/src/core/types.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/indexer-coordinator/src/core/types.ts b/apps/indexer-coordinator/src/core/types.ts index 081b75e4e..f7a89a9c0 100644 --- a/apps/indexer-coordinator/src/core/types.ts +++ b/apps/indexer-coordinator/src/core/types.ts @@ -6,6 +6,9 @@ import { BigNumber, ContractTransaction, Overrides } from 'ethers'; export enum DesiredStatus { STOPPED, RUNNING, + + // LLM + PULLING, } export enum IndexerDeploymentStatus { From cff6f5240e6f0a420fb361fe72c208ba96e783eb Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Fri, 2 Aug 2024 15:44:13 +0800 Subject: [PATCH 09/18] feat: demo version --- apps/indexer-coordinator/package.json | 1 + .../src/integration/integration.model.ts | 5 +- .../src/integration/integration.resolver.ts | 22 +- .../src/integration/integration.service.ts | 49 +++- .../src/project/project.llm.service.ts | 260 +++++++++++++++--- .../src/project/project.manifest.ts | 2 + .../src/project/project.model.ts | 6 +- .../src/project/project.resolver.ts | 27 +- apps/indexer-coordinator/src/project/types.ts | 37 +++ 9 files changed, 363 insertions(+), 46 deletions(-) diff --git a/apps/indexer-coordinator/package.json b/apps/indexer-coordinator/package.json index f3e0579b5..2e508419e 100644 --- a/apps/indexer-coordinator/package.json +++ b/apps/indexer-coordinator/package.json @@ -70,6 +70,7 @@ "lodash": "^4.17.21", "lru-cache": "^7.8.1", "node-fetch": "2.6.7", + "ollama": "^0.5.6", "pg": "^8.7.3", "pino": "^7.3.0", "portfinder": "^1.0.32", diff --git a/apps/indexer-coordinator/src/integration/integration.model.ts b/apps/indexer-coordinator/src/integration/integration.model.ts index 37cffacf6..770528dff 100644 --- a/apps/indexer-coordinator/src/integration/integration.model.ts +++ b/apps/indexer-coordinator/src/integration/integration.model.ts @@ -11,7 +11,7 @@ import { UpdateDateColumn, } from 'typeorm'; import { SeviceEndpoint } from '../project/project.model'; -import { IntegrationType } from '../project/types'; +import { IntegrationType, LLMModel } from '../project/types'; @Entity('integration') @Index(['title'], { unique: true }) @@ -33,6 +33,9 @@ export class IntegrationEntity { @Field(() => [SeviceEndpoint], { nullable: true }) serviceEndpoints: SeviceEndpoint[]; + @Field(() => [LLMModel], { nullable: true }) + models: LLMModel[]; + @Column({ type: 'boolean', default: false }) @Field() enabled: boolean; diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts index bf8f7f6bc..a6a369f01 100644 --- a/apps/indexer-coordinator/src/integration/integration.resolver.ts +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -3,7 +3,7 @@ import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { SeviceEndpoint } from '../project/project.model'; -import { IntegrationType, LLMConfig, LLMExtra } from '../project/types'; +import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult } from '../project/types'; import { IntegrationEntity } from './integration.model'; import { IntegrationService } from './integration.service'; @@ -41,4 +41,24 @@ export class IntegrationResolver { ): Promise { return this.integrationService.update(id, title, serviceEndpoints, enabled, config, extra); } + + @Mutation(() => IntegrationEntity) + deleteIntegration(@Args('id') id: number): Promise { + return this.integrationService.delete(id); + } + + @Mutation(() => IntegrationEntity) + deleteModel(@Args('id') id: number, @Args('name') name: string): Promise { + return this.integrationService.deleteModel(id, name); + } + + @Mutation(() => IntegrationEntity) + pullModel(@Args('id') id: number, @Args('name') name: string): Promise { + return this.integrationService.pullModel(id, name); + } + + @Query(() => [LLMModelPullResult]) + inspectDownload(): LLMModelPullResult[] { + return this.integrationService.inspectDownload(); + } } diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts index ffb5042da..587f413b1 100644 --- a/apps/indexer-coordinator/src/integration/integration.service.ts +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -6,7 +6,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { ProjectLLMService } from '../project/project.llm.service'; import { SeviceEndpoint, ValidationResponse } from '../project/project.model'; -import { IntegrationType, LLMConfig, LLMExtra } from '../project/types'; +import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult } from '../project/types'; import { IntegrationEntity } from './integration.model'; @Injectable() @@ -22,7 +22,16 @@ export class IntegrationService { } async getAll(): Promise { - return this.integrationRepo.find(); + const integrations = await this.integrationRepo.find(); + + for (const it of integrations) { + if (it.type === IntegrationType.LLM && it.serviceEndpoints.length > 0) { + const models = await this.projectLLMService.getModels(it.serviceEndpoints[0].value); + it.models = models; + } + } + + return integrations; } async create( @@ -99,4 +108,40 @@ export class IntegrationService { integration.extra = extra || {}; return this.integrationRepo.save(integration); } + + async delete(id: number): Promise { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) return; + return this.integrationRepo.remove(integration); + } + + async deleteModel(id: number, name: string): Promise { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + if (integration.type !== IntegrationType.LLM) { + throw new Error(`${id} not supported`); + } + const host = integration.serviceEndpoints[0].value; + await this.projectLLMService.deleteModel(host, name); + return integration; + } + + async pullModel(id: number, name: string) { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + if (integration.type !== IntegrationType.LLM) { + throw new Error(`${id} not supported`); + } + const host = integration.serviceEndpoints[0].value; + this.projectLLMService.pullModel(host, name); + return integration; + } + + inspectDownload(): LLMModelPullResult[] { + return this.projectLLMService.inspectPullingProgress(); + } } diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index a55d410a4..56a9ed9db 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -4,26 +4,34 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Ollama } from 'ollama'; +import { DesiredStatus } from 'src/core/types'; import { getLogger } from 'src/utils/logger'; -import { OllamaEvent } from 'src/utils/subscription'; import { Repository } from 'typeorm'; import { SubscriptionService } from '../subscription/subscription.service'; import { LLMManifest } from './project.manifest'; import { IProjectConfig, + MetadataType, Project, ProjectEntity, SeviceEndpoint, ValidationResponse, } from './project.model'; import { ProjectService } from './project.service'; -import { DesiredStatus } from 'src/core/types'; -import { LLMEndpointAccessType, LLMEndpointType, SubqueryEndpointType } from './types'; +import { + LLMEndpointAccessType, + LLMEndpointType, + LLMModel, + LLMModelPullResult, + LLMModelStatus, +} from './types'; const logger = getLogger('project.llm.service'); @Injectable() export class ProjectLLMService { + private pullingProgress = new Map(); + constructor( @InjectRepository(ProjectEntity) private projectRepo: Repository, private projectService: ProjectService, @@ -42,44 +50,47 @@ export class ProjectLLMService { if (project.rateLimit !== rateLimit) { project.rateLimit = rateLimit; } - - const endpoints = projectConfig.serviceEndpoints; - const host = endpoints[0].value; - - const manifest = project.manifest as LLMManifest; - const targetModel = manifest.model.name; + const host = new URL(projectConfig.serviceEndpoints[0].value).toString(); + const targetModel = (project.manifest as LLMManifest).model.name; + const normalizedModel = this.normalizeModelName(targetModel); try { const ollama = new Ollama({ host }); const allModels = await ollama.list(); - const model = allModels?.models?.find((m) => m.name === targetModel); + const model = allModels?.models?.find((m) => m.name === normalizedModel); if (!model) { - project.status = DesiredStatus.PULLING; - - ollama.pull({ model: targetModel, stream: true }).then(async (stream) => { - for await (const part of stream) { - // console.log(part); - this.pubSub.publish(OllamaEvent.PullProgress, part); - } - project.status = DesiredStatus.RUNNING; - await this.projectRepo.save(project); - }); - } + if (!this.getOnPullingModels(host).find((m) => m.name === normalizedModel)) { + project.status = DesiredStatus.PULLING; - project.projectConfig = projectConfig; - project.serviceEndpoints = [ - new SeviceEndpoint( - LLMEndpointType.ApiGenerateEndpoint, - this.nodeEndpoint(host, '/v1/chat/completions'), - LLMEndpointAccessType[LLMEndpointType.ApiGenerateEndpoint] - ), - ]; + ollama + .pull({ model: normalizedModel, stream: true }) + .then(async (stream) => { + for await (const part of stream) { + this.updatePullingProgress(host, { name: normalizedModel, ...part }); + } + this.removePullingProgress(host, normalizedModel); + project.status = DesiredStatus.RUNNING; + await this.projectRepo.save(project); + }) + .catch((err) => { + this.removePullingProgress(host, normalizedModel); + logger.error(`${id} pull model:${normalizedModel} host: ${host} failed: ${err.message}`); + }); + } + } } catch (err) { - logger.error(`Failed to start LLM project host: ${host} failed: ${err.message}`); - throw new Error(`Failed to start LLM project: ${err.message}`); + logger.error(`startLLMProject id: ${id} failed: ${err.message}`); } + project.projectConfig = projectConfig; + project.serviceEndpoints = [ + new SeviceEndpoint( + LLMEndpointType.ApiGenerateEndpoint, + this.nodeEndpoint(host, '/v1/chat/completions'), + LLMEndpointAccessType[LLMEndpointType.ApiGenerateEndpoint] + ), + ]; return await this.projectRepo.save(project); } @@ -96,14 +107,13 @@ export class ProjectLLMService { const project = await this.projectService.getProject(id); if (!project) return []; const endpoints = project.serviceEndpoints; - const host = endpoints[0].value; - + const host = endpoints[0]?.value; const manifest = project.manifest as LLMManifest; - const targetModel = manifest.model.name; - - const ollama = new Ollama({ host }); - await ollama.delete({ model: targetModel }); - + const targetModel = manifest?.model?.name; + if (host && targetModel) { + const ollama = new Ollama({ host }); + await ollama.delete({ model: targetModel }); + } return this.projectRepo.remove([project]); } @@ -118,8 +128,182 @@ export class ProjectLLMService { } } + async getModels(host: string): Promise { + const res = []; + try { + host = new URL(host).toString(); + const ollama = new Ollama({ host }); + + const downloadedModels = await ollama.list(); + const loadedModels = await ollama.ps(); + const pullingModels = this.getOnPullingModels(host); + + const loadedModelNames = loadedModels?.models?.map((m) => m.name); + + downloadedModels?.models?.forEach((m) => { + res.push({ + name: m.name, + size: m.size, + digest: m.digest, + status: loadedModelNames.find((lm) => lm === m.name) + ? LLMModelStatus.LOADED + : LLMModelStatus.NORMAL, + }); + }); + + pullingModels.forEach((m) => { + res.push({ name: m.name, status: LLMModelStatus.PULLING, pullStatus: m }); + }); + } catch (err) { + logger.error(`getModels host: ${host} failed: ${err.message}`); + } + return res; + } + + async deleteModel(host: string, model: string): Promise { + host = new URL(host).toString(); + model = this.normalizeModelName(model); + const ollama = new Ollama({ host }); + await ollama.delete({ model }); + } + + pullModel(host: string, model: string): void { + host = new URL(host).toString(); + const ollama = new Ollama({ host }); + ollama + .pull({ model, stream: true }) + .then(async (stream) => { + for await (const part of stream) { + console.log(part); + this.updatePullingProgress(host, { name: model, ...part }); + } + this.removePullingProgress(host, model); + }) + .catch((err) => { + this.removePullingProgress(host, model); + logger.error(`pull model:${model} host: ${host} failed: ${err.message}`); + }); + } + + async getLLMMetadata(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) { + return; + } + const manifest = project.manifest as LLMManifest; + const model = manifest?.model?.name; + const endpoints = project.projectConfig.serviceEndpoints; + const host = endpoints[0]?.value; + + let m = null; + if (model && host) { + m = await this.getModel(host, model); + } + + return { + startHeight: 0, + lastHeight: 0, + lastTime: 0, + targetHeight: 0, + healthy: true, + chain: '', + specName: '', + genesisHash: '', + indexerNodeVersion: '', + queryNodeVersion: '', + indexerStatus: '', + queryStatus: '', + model: m, + }; + } + + private updatePullingProgress(host: string, part: LLMModelPullResult) { + const progress = this.pullingProgress.get(host) || []; + const index = progress.findIndex((p) => p.name === part.name); + if (index >= 0) { + progress[index] = part; + } else { + progress.push(part); + } + this.pullingProgress.set(host, progress); + } + + async getModel(host: string, model: string): Promise { + const res: LLMModel = { + name: model, + status: LLMModelStatus.NOT_READY, + }; + try { + const normalizedModel = this.normalizeModelName(model); + host = new URL(host).toString(); + const pullingModels = this.getOnPullingModels(host); + const pullingModel = pullingModels.find((m) => m.name === normalizedModel); + if (pullingModels.find((m) => m.name === normalizedModel)) { + res.status = LLMModelStatus.PULLING; + res.pullStatus = pullingModel; + return res; + } + const ollama = new Ollama({ host }); + const downloadedModels = await ollama.list(); + const loadedModels = await ollama.ps(); + + if (downloadedModels?.models?.find((m) => m.name === normalizedModel)) { + res.status = LLMModelStatus.NORMAL; + } + if (loadedModels.models?.find((lm) => lm.name === normalizedModel)) { + res.status = LLMModelStatus.LOADED; + } + } catch (err) { + logger.error(`getModel host: ${host} model: ${model} failed: ${err.message}`); + } + return res; + } + + // todo: remove + getPullingProgress(host: string, model: string): LLMModelPullResult { + host = new URL(host).toString(); + model = this.normalizeModelName(model); + const progress = this.pullingProgress.get(host) || []; + return progress.find((p) => p.name === model); + } + + private getOnPullingModels(host: string): LLMModelPullResult[] { + return this.pullingProgress.get(host) || []; + } + + private removePullingProgress(host: string, model: string) { + const progress = this.pullingProgress.get(host) || []; + const index = progress.findIndex((p) => p.name === model); + if (index >= 0) { + progress.splice(index, 1); + } + if (progress.length === 0) { + this.pullingProgress.delete(host); + } else { + this.pullingProgress.set(host, progress); + } + } + + inspectPullingProgress(): LLMModelPullResult[] { + const res = []; + for (const [host, pulls] of this.pullingProgress.entries()) { + for (const pull of pulls) { + res.push({ host, ...pull }); + } + } + console.log('inspectPullingProgress:', res); + return res; + } + nodeEndpoint(host: string, input: string): string { const url = new URL(input, host); return url.toString(); } + + normalizeModelName(model: string): string { + if (model.lastIndexOf(':') === -1) { + return model + ':latest'; + } + return model; + } } diff --git a/apps/indexer-coordinator/src/project/project.manifest.ts b/apps/indexer-coordinator/src/project/project.manifest.ts index 1d2cc2c8c..cc04f6145 100644 --- a/apps/indexer-coordinator/src/project/project.manifest.ts +++ b/apps/indexer-coordinator/src/project/project.manifest.ts @@ -143,4 +143,6 @@ export class AggregatedManifest { rpcManifest?: RpcManifest; @Field(() => SubgraphManifest, { nullable: true }) subgraphManifest?: SubgraphManifest; + @Field(() => LLMManifest, { nullable: true }) + llmManifest?: LLMManifest; } diff --git a/apps/indexer-coordinator/src/project/project.model.ts b/apps/indexer-coordinator/src/project/project.model.ts index d45d249cd..603a17381 100644 --- a/apps/indexer-coordinator/src/project/project.model.ts +++ b/apps/indexer-coordinator/src/project/project.model.ts @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { Field, ID, InputType, Int, ObjectType } from '@nestjs/graphql'; -import { Column, Entity, PrimaryColumn, BeforeInsert, PrimaryGeneratedColumn } from 'typeorm'; -import { AccessType, HostType, IntegrationType, ProjectType } from './types'; +import { Column, Entity, PrimaryColumn, BeforeInsert } from 'typeorm'; +import { AccessType, HostType, LLMModel, ProjectType } from './types'; // TODO: temp place to put these types @ObjectType('ProjectInfo') @@ -76,6 +76,8 @@ export class MetadataType { indexerStatus?: string; @Field({ nullable: true }) queryStatus?: string; + @Field({ nullable: true }) + model?: LLMModel; } export interface IProjectBaseConfig { diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index fe767058b..9b930d07d 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -28,6 +28,7 @@ import { ProjectSubgraphService } from './project.subgraph.service'; import { AccessType, HostType, + LLMModelPullResult, ProjectType, SubgraphEndpoint, SubgraphEndpointAccessType, @@ -63,7 +64,9 @@ export class ProjectResolver { ) { let project: Project; if (projectType === undefined) { - projectType = await this.projectService.getProjectType(id); + // todo: remove + // projectType = await this.projectService.getProjectType(id); + projectType = ProjectType.LLM; } switch (projectType) { case ProjectType.SUBQUERY: @@ -77,6 +80,8 @@ export class ProjectResolver { return this.projectRpcService.getRpcMetadata(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.getSubgraphMetadata(id); + case ProjectType.LLM: + return this.projectLLMService.getLLMMetadata(id); default: throw new Error(`Unknown project type ${projectType}`); } @@ -126,6 +131,11 @@ export class ProjectResolver { ...project, metadata: await this.projectSubgraphService.getSubgraphMetadata(project.id), }; + case ProjectType.LLM: + return { + ...project, + metadata: await this.projectLLMService.getLLMMetadata(project.id), + }; default: throw new Error(`Unknown project type ${project.projectType}`); } @@ -193,7 +203,9 @@ export class ProjectResolver { ): Promise { const manifest = new AggregatedManifest(); if (projectType === undefined) { - projectType = await this.projectService.getProjectType(projectId); + // todo: remove + // projectType = await this.projectService.getProjectType(projectId); + projectType = ProjectType.LLM; } switch (projectType) { case ProjectType.SUBQUERY: @@ -205,6 +217,9 @@ export class ProjectResolver { case ProjectType.SUBGRAPH: manifest.subgraphManifest = await this.projectService.getManifest(projectId); break; + case ProjectType.LLM: + manifest.llmManifest = await this.projectService.getManifest(projectId); + break; default: throw new Error(`Unknown project type ${projectType}`); } @@ -374,6 +389,14 @@ export class ProjectResolver { return (await this.dbStatsService.getProjectDbStats(id)).size || '0'; } + @Query(() => LLMModelPullResult, { nullable: true }) + getPullingProgress( + @Args('host') host: string, + @Args('model') model: string + ): LLMModelPullResult { + return this.projectLLMService.getPullingProgress(host, model); + } + @Subscription(() => String) progressChanged() { return this.pubSub.asyncIterator(OllamaEvent.PullProgress); diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 0d6514499..d95d12e64 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -22,6 +22,43 @@ export enum IntegrationType { LLM, } +export enum LLMModelStatus { + NOT_READY = 'notReady', + NORMAL = 'normal', + PULLING = 'pulling', + LOADED = 'loaded', +} + +@ObjectType() +export class LLMModelPullResult { + @Field() + name: string; + @Field() + status: string; + @Field({ nullable: true }) + host?: string; + @Field({ nullable: true }) + digest?: string; + @Field({ nullable: true }) + total?: number; + @Field({ nullable: true }) + completed?: number; +} + +@ObjectType() +export class LLMModel { + @Field() + name: string; + @Field({ nullable: true }) + size?: number; + @Field({ nullable: true }) + digest?: string; + @Field() + status: LLMModelStatus; + @Field({ nullable: true }) + pullStatus?: LLMModelPullResult; +} + @InputType() @ObjectType() export class LLMConfig { From 84e4d981323e4f73c32caed4ee40cba47f3bfaf8 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Fri, 2 Aug 2024 16:07:33 +0800 Subject: [PATCH 10/18] build: v2.2.4-1 --- apps/indexer-coordinator/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/indexer-coordinator/package.json b/apps/indexer-coordinator/package.json index 2e508419e..daf499cbc 100644 --- a/apps/indexer-coordinator/package.json +++ b/apps/indexer-coordinator/package.json @@ -1,6 +1,6 @@ { "name": "@subql/indexer-coordinator", - "version": "2.2.4-0", + "version": "2.2.4-1", "description": "", "author": "SubQuery", "license": "Apache-2.0", From 7f9e809c78f0e6ce3eb0b9b92fb55baa4613762a Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Sun, 4 Aug 2024 14:29:56 +0800 Subject: [PATCH 11/18] feat: use node-fetch --- .../src/integration/integration.resolver.ts | 13 +- .../src/integration/integration.service.ts | 6 +- .../src/project/project.llm.service.ts | 154 ++++++++++----- apps/indexer-coordinator/src/project/types.ts | 8 + apps/indexer-coordinator/src/utils/ollama.ts | 184 ++++++++++++++++++ 5 files changed, 316 insertions(+), 49 deletions(-) create mode 100644 apps/indexer-coordinator/src/utils/ollama.ts diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts index a6a369f01..23a4278b2 100644 --- a/apps/indexer-coordinator/src/integration/integration.resolver.ts +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -3,7 +3,13 @@ import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { SeviceEndpoint } from '../project/project.model'; -import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult } from '../project/types'; +import { + IntegrationType, + LLMConfig, + LLMExtra, + LLMModelPullResult, + LLMOngoingStreamRequestMeta, +} from '../project/types'; import { IntegrationEntity } from './integration.model'; import { IntegrationService } from './integration.service'; @@ -61,4 +67,9 @@ export class IntegrationResolver { inspectDownload(): LLMModelPullResult[] { return this.integrationService.inspectDownload(); } + + @Query(() => [LLMOngoingStreamRequestMeta]) + inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { + return this.integrationService.inspectOngoingStreamedRequests(); + } } diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts index 587f413b1..9016f9d32 100644 --- a/apps/indexer-coordinator/src/integration/integration.service.ts +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -6,7 +6,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { ProjectLLMService } from '../project/project.llm.service'; import { SeviceEndpoint, ValidationResponse } from '../project/project.model'; -import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult } from '../project/types'; +import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult, LLMOngoingStreamRequestMeta } from '../project/types'; import { IntegrationEntity } from './integration.model'; @Injectable() @@ -144,4 +144,8 @@ export class IntegrationService { inspectDownload(): LLMModelPullResult[] { return this.projectLLMService.inspectPullingProgress(); } + + inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { + return this.projectLLMService.inspectOngoingStreamedRequests(); + } } diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index 56a9ed9db..440fa97c7 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -3,11 +3,12 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Ollama } from 'ollama'; +// import { Ollama } from 'ollama'; +import fetch from 'node-fetch'; import { DesiredStatus } from 'src/core/types'; import { getLogger } from 'src/utils/logger'; +import { AbortableAsyncIterator, Ollama, normalizeModelName } from 'src/utils/ollama'; import { Repository } from 'typeorm'; -import { SubscriptionService } from '../subscription/subscription.service'; import { LLMManifest } from './project.manifest'; import { IProjectConfig, @@ -28,14 +29,65 @@ import { const logger = getLogger('project.llm.service'); +async function fetchAdapter(input: string | URL | Request, init?: RequestInit): Promise { + let url: string; + if (input instanceof URL) { + url = input.toString(); + } else if (typeof input === 'string') { + url = input; + } else { + url = input.url; + } + + const r = await fetch(url, { + method: init.method, + headers: init?.headers as any, + body: init?.body as any, + }); + + // const t = await r.text(); + const sstream: NodeJS.ReadableStream = r.body; + + const dstream: ReadableStream = new ReadableStream({ + start(controller) { + sstream.on('readable', function () { + let data; + while ((data = this.read()) !== null) { + console.log('data:', data.toString()); + controller.enqueue(data); + } + }); + sstream.on('end', () => { + console.log('end'); + controller.close(); + }); + sstream.on('error', (err) => { + console.log('error:', err); + controller.error(err); + }); + }, + cancel(reason) { + if ((sstream as any).destroy) { + (sstream as any).destroy.destroy(new Error(reason)); + } + }, + }); + const res = new Response(dstream, { + headers: r.headers as any, + status: r.status, + statusText: r.statusText, + }); + return res; +} + @Injectable() export class ProjectLLMService { private pullingProgress = new Map(); + private ongoingStreamedRequests: AbortableAsyncIterator[] = []; constructor( @InjectRepository(ProjectEntity) private projectRepo: Repository, - private projectService: ProjectService, - private pubSub: SubscriptionService + private projectService: ProjectService ) {} async startLLMProject( @@ -50,39 +102,16 @@ export class ProjectLLMService { if (project.rateLimit !== rateLimit) { project.rateLimit = rateLimit; } - const host = new URL(projectConfig.serviceEndpoints[0].value).toString(); + const host = projectConfig.serviceEndpoints[0].value; const targetModel = (project.manifest as LLMManifest).model.name; - const normalizedModel = this.normalizeModelName(targetModel); try { - const ollama = new Ollama({ host }); - const allModels = await ollama.list(); - - const model = allModels?.models?.find((m) => m.name === normalizedModel); - if (!model) { - if (!this.getOnPullingModels(host).find((m) => m.name === normalizedModel)) { - project.status = DesiredStatus.PULLING; - - ollama - .pull({ model: normalizedModel, stream: true }) - .then(async (stream) => { - for await (const part of stream) { - this.updatePullingProgress(host, { name: normalizedModel, ...part }); - } - this.removePullingProgress(host, normalizedModel); - project.status = DesiredStatus.RUNNING; - await this.projectRepo.save(project); - }) - .catch((err) => { - this.removePullingProgress(host, normalizedModel); - logger.error(`${id} pull model:${normalizedModel} host: ${host} failed: ${err.message}`); - }); - } - } + this.pullModel(host, targetModel); } catch (err) { logger.error(`startLLMProject id: ${id} failed: ${err.message}`); } + project.status = DesiredStatus.RUNNING; project.projectConfig = projectConfig; project.serviceEndpoints = [ new SeviceEndpoint( @@ -133,7 +162,7 @@ export class ProjectLLMService { try { host = new URL(host).toString(); const ollama = new Ollama({ host }); - + // const aaa = await fetch('http://127.0.0.1:11434/api/tags'); const downloadedModels = await ollama.list(); const loadedModels = await ollama.ps(); const pullingModels = this.getOnPullingModels(host); @@ -162,26 +191,49 @@ export class ProjectLLMService { async deleteModel(host: string, model: string): Promise { host = new URL(host).toString(); - model = this.normalizeModelName(model); + model = normalizeModelName(model); const ollama = new Ollama({ host }); await ollama.delete({ model }); + + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + onPulling?.abort(); } - pullModel(host: string, model: string): void { + async pullModel(host: string, model: string): Promise { host = new URL(host).toString(); + model = normalizeModelName(model); + + const onPulllingModels = this.pullingProgress.get(host) || []; + const existPulling = onPulllingModels.find((m) => m.name === model); + if (existPulling) { + return; + } + const ollama = new Ollama({ host }); + const allModels = await ollama.list(); + + const existModel = allModels?.models?.find((m) => m.name === model); + if (existModel) { + return; + } + let it: AbortableAsyncIterator; ollama .pull({ model, stream: true }) - .then(async (stream) => { - for await (const part of stream) { - console.log(part); - this.updatePullingProgress(host, { name: model, ...part }); + .then(async (iterator) => { + it = iterator; + this.ongoingStreamedRequests.push(iterator); + for await (const message of iterator) { + this.updatePullingProgress(host, { name: model, ...message }); } this.removePullingProgress(host, model); + this.removeOngoingStreamedRequests(it); }) .catch((err) => { this.removePullingProgress(host, model); - logger.error(`pull model:${model} host: ${host} failed: ${err.message}`); + this.removeOngoingStreamedRequests(it); + logger.error(`pull error model:${model} host: ${host} failed: ${err.message}`); }); } @@ -234,7 +286,7 @@ export class ProjectLLMService { status: LLMModelStatus.NOT_READY, }; try { - const normalizedModel = this.normalizeModelName(model); + const normalizedModel = normalizeModelName(model); host = new URL(host).toString(); const pullingModels = this.getOnPullingModels(host); const pullingModel = pullingModels.find((m) => m.name === normalizedModel); @@ -262,7 +314,7 @@ export class ProjectLLMService { // todo: remove getPullingProgress(host: string, model: string): LLMModelPullResult { host = new URL(host).toString(); - model = this.normalizeModelName(model); + model = normalizeModelName(model); const progress = this.pullingProgress.get(host) || []; return progress.find((p) => p.name === model); } @@ -284,6 +336,13 @@ export class ProjectLLMService { } } + private removeOngoingStreamedRequests(iterator: AbortableAsyncIterator) { + const i = this.ongoingStreamedRequests.indexOf(iterator); + if (i > -1) { + this.ongoingStreamedRequests.splice(i, 1); + } + } + inspectPullingProgress(): LLMModelPullResult[] { const res = []; for (const [host, pulls] of this.pullingProgress.entries()) { @@ -295,15 +354,16 @@ export class ProjectLLMService { return res; } + inspectOngoingStreamedRequests() { + const res = []; + for (const it of this.ongoingStreamedRequests) { + res.push(it.meta); + } + return res; + } + nodeEndpoint(host: string, input: string): string { const url = new URL(input, host); return url.toString(); } - - normalizeModelName(model: string): string { - if (model.lastIndexOf(':') === -1) { - return model + ':latest'; - } - return model; - } } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index d95d12e64..4fa0a3850 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -73,6 +73,14 @@ export class LLMExtra { bar: string; } +@ObjectType() +export class LLMOngoingStreamRequestMeta { + @Field() + model: string; + @Field() + host: string; +} + export enum HostType { UN_RESOLVED = 'un-resolved', SYSTEM_MANAGED = 'system-managed', diff --git a/apps/indexer-coordinator/src/utils/ollama.ts b/apps/indexer-coordinator/src/utils/ollama.ts new file mode 100644 index 000000000..095532209 --- /dev/null +++ b/apps/indexer-coordinator/src/utils/ollama.ts @@ -0,0 +1,184 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import fetch from 'node-fetch'; +import { LLMOngoingStreamRequestMeta } from 'src/project/types'; + +export interface Config { + host: string; +} + +export interface DeleteRequest { + model: string; +} + +export interface PullRequest { + model: string; + insecure?: boolean; + stream?: boolean; +} + +export interface ModelDetails { + parent_model: string; + format: string; + family: string; + families: string[]; + parameter_size: string; + quantization_level: string; +} + +export interface ModelResponse { + name: string; + modified_at: Date; + size: number; + digest: string; + details: ModelDetails; + expires_at: Date; + size_vram: number; +} + +export interface ListResponse { + models: ModelResponse[]; +} + +export interface ErrorResponse { + error: string; +} + +export class AbortableAsyncIterator { + private readonly abortController: AbortController; + // private readonly itr: AsyncGenerator; + private readonly stream: NodeJS.ReadableStream; + readonly meta: LLMOngoingStreamRequestMeta; + constructor( + abortController: AbortController, + // itr: AsyncGenerator, + stream: NodeJS.ReadableStream, + meta: LLMOngoingStreamRequestMeta + ) { + this.abortController = abortController; + this.stream = stream; + this.meta = meta; + } + + abort() { + this.abortController.abort(); + } + + async *[Symbol.asyncIterator]() { + let buffer = ''; + // console.log('========= terrateor ==='); + this.stream.setEncoding('utf8'); + for await (const message of this.stream) { + // console.log(`message:${message} hh`); + buffer += message; + const parts = buffer.split('\n'); + buffer = parts.pop() ?? ''; + for (const part of parts) { + try { + const message = JSON.parse(part); + if ('error' in message) { + throw new Error(message.error); + } + yield message; + + if (message.done || message.status === 'success') { + return; + } + } catch (error) { + console.warn('invalid json: ', part); + } + } + } + + for (const part of buffer.split('\n').filter((p) => p !== '')) { + try { + const message = JSON.parse(part); + if ('error' in message) { + throw new Error(message.error); + } + yield message; + + if (message.done || message.status === 'success') { + return; + } + } catch (error) { + console.warn('invalid json: ', part); + } + } + throw new Error('Did not receive done or success response in stream.'); + } +} + +const defaultHeaders = { + 'Content-Type': 'application/json', + Accept: 'application/json', +}; + +export class Ollama { + config: Config; + + constructor(config: Config) { + this.config = config; + } + + async list(): Promise { + const url = new URL('api/tags', this.config.host).toString(); + const response = await fetch(url, { + method: 'GET', + headers: defaultHeaders, + }); + return (await response.json()) as ListResponse; + } + + async ps(): Promise { + const url = new URL('api/ps', this.config.host).toString(); + const response = await fetch(url, { + method: 'GET', + headers: defaultHeaders, + }); + return (await response.json()) as ListResponse; + } + + async delete(request: DeleteRequest) { + const url = new URL('api/delete', this.config.host).toString(); + await fetch(url, { + method: 'DELETE', + body: JSON.stringify({ + name: request.model, + }), + headers: defaultHeaders, + }); + return { status: 'success' }; + } + + async pull(request: PullRequest) { + const host = new URL(this.config.host).toString(); + const url = new URL('api/pull', this.config.host).toString(); + const model = normalizeModelName(request.model); + const abortController = new AbortController(); + const response = await fetch(url, { + method: 'POST', + body: JSON.stringify({ + name: model, + stream: request.stream, + insecure: request.insecure, + }), + headers: defaultHeaders, + signal: abortController.signal, + }); + + const abortableAsyncIterator = new AbortableAsyncIterator(abortController, response.body, { + model, + host, + }); + return abortableAsyncIterator; + } +} + +export function normalizeModelName(model: string): string { + if (model.lastIndexOf(':') === -1) { + return model + ':latest'; + } + return model; +} From 5040bc897e59f45b1a2c86d644a2e68f6544c959 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Sun, 4 Aug 2024 23:23:36 +0800 Subject: [PATCH 12/18] refactor: remove useless --- .../src/integration/integration.resolver.ts | 6 -- .../src/integration/integration.service.ts | 4 -- .../src/project/project.llm.service.ts | 66 ++++++------------- apps/indexer-coordinator/src/project/types.ts | 2 + apps/indexer-coordinator/src/utils/ollama.ts | 48 ++++++++++++-- 5 files changed, 65 insertions(+), 61 deletions(-) diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts index 23a4278b2..59146019a 100644 --- a/apps/indexer-coordinator/src/integration/integration.resolver.ts +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -7,7 +7,6 @@ import { IntegrationType, LLMConfig, LLMExtra, - LLMModelPullResult, LLMOngoingStreamRequestMeta, } from '../project/types'; import { IntegrationEntity } from './integration.model'; @@ -63,11 +62,6 @@ export class IntegrationResolver { return this.integrationService.pullModel(id, name); } - @Query(() => [LLMModelPullResult]) - inspectDownload(): LLMModelPullResult[] { - return this.integrationService.inspectDownload(); - } - @Query(() => [LLMOngoingStreamRequestMeta]) inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { return this.integrationService.inspectOngoingStreamedRequests(); diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts index 9016f9d32..f59748469 100644 --- a/apps/indexer-coordinator/src/integration/integration.service.ts +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -141,10 +141,6 @@ export class IntegrationService { return integration; } - inspectDownload(): LLMModelPullResult[] { - return this.projectLLMService.inspectPullingProgress(); - } - inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { return this.projectLLMService.inspectOngoingStreamedRequests(); } diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index 440fa97c7..5a250f78d 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -3,7 +3,6 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -// import { Ollama } from 'ollama'; import fetch from 'node-fetch'; import { DesiredStatus } from 'src/core/types'; import { getLogger } from 'src/utils/logger'; @@ -82,7 +81,6 @@ async function fetchAdapter(input: string | URL | Request, init?: RequestInit): @Injectable() export class ProjectLLMService { - private pullingProgress = new Map(); private ongoingStreamedRequests: AbortableAsyncIterator[] = []; constructor( @@ -162,7 +160,6 @@ export class ProjectLLMService { try { host = new URL(host).toString(); const ollama = new Ollama({ host }); - // const aaa = await fetch('http://127.0.0.1:11434/api/tags'); const downloadedModels = await ollama.list(); const loadedModels = await ollama.ps(); const pullingModels = this.getOnPullingModels(host); @@ -205,9 +202,10 @@ export class ProjectLLMService { host = new URL(host).toString(); model = normalizeModelName(model); - const onPulllingModels = this.pullingProgress.get(host) || []; - const existPulling = onPulllingModels.find((m) => m.name === model); - if (existPulling) { + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + if (onPulling) { return; } @@ -223,15 +221,13 @@ export class ProjectLLMService { .pull({ model, stream: true }) .then(async (iterator) => { it = iterator; - this.ongoingStreamedRequests.push(iterator); + this.addOngoinStreamedRequests(iterator); for await (const message of iterator) { - this.updatePullingProgress(host, { name: model, ...message }); + iterator.updateProgress({ name: model, ...message }); } - this.removePullingProgress(host, model); this.removeOngoingStreamedRequests(it); }) .catch((err) => { - this.removePullingProgress(host, model); this.removeOngoingStreamedRequests(it); logger.error(`pull error model:${model} host: ${host} failed: ${err.message}`); }); @@ -269,17 +265,6 @@ export class ProjectLLMService { }; } - private updatePullingProgress(host: string, part: LLMModelPullResult) { - const progress = this.pullingProgress.get(host) || []; - const index = progress.findIndex((p) => p.name === part.name); - if (index >= 0) { - progress[index] = part; - } else { - progress.push(part); - } - this.pullingProgress.set(host, progress); - } - async getModel(host: string, model: string): Promise { const res: LLMModel = { name: model, @@ -315,25 +300,25 @@ export class ProjectLLMService { getPullingProgress(host: string, model: string): LLMModelPullResult { host = new URL(host).toString(); model = normalizeModelName(model); - const progress = this.pullingProgress.get(host) || []; - return progress.find((p) => p.name === model); + + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + return onPulling?.meta.progress; } private getOnPullingModels(host: string): LLMModelPullResult[] { - return this.pullingProgress.get(host) || []; + const res = []; + for (const iter of this.ongoingStreamedRequests) { + if (iter.meta.host === host && iter.meta.progress) { + res.push(iter.meta.progress); + } + } + return res; } - private removePullingProgress(host: string, model: string) { - const progress = this.pullingProgress.get(host) || []; - const index = progress.findIndex((p) => p.name === model); - if (index >= 0) { - progress.splice(index, 1); - } - if (progress.length === 0) { - this.pullingProgress.delete(host); - } else { - this.pullingProgress.set(host, progress); - } + private addOngoinStreamedRequests(iterator: AbortableAsyncIterator) { + this.ongoingStreamedRequests.push(iterator); } private removeOngoingStreamedRequests(iterator: AbortableAsyncIterator) { @@ -343,17 +328,6 @@ export class ProjectLLMService { } } - inspectPullingProgress(): LLMModelPullResult[] { - const res = []; - for (const [host, pulls] of this.pullingProgress.entries()) { - for (const pull of pulls) { - res.push({ host, ...pull }); - } - } - console.log('inspectPullingProgress:', res); - return res; - } - inspectOngoingStreamedRequests() { const res = []; for (const it of this.ongoingStreamedRequests) { diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 4fa0a3850..e723a1ff4 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -79,6 +79,8 @@ export class LLMOngoingStreamRequestMeta { model: string; @Field() host: string; + @Field({ nullable: true }) + progress?: LLMModelPullResult; } export enum HostType { diff --git a/apps/indexer-coordinator/src/utils/ollama.ts b/apps/indexer-coordinator/src/utils/ollama.ts index 095532209..cd5215fb9 100644 --- a/apps/indexer-coordinator/src/utils/ollama.ts +++ b/apps/indexer-coordinator/src/utils/ollama.ts @@ -1,8 +1,8 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: Apache-2.0 -import fetch from 'node-fetch'; -import { LLMOngoingStreamRequestMeta } from 'src/project/types'; +import fetch, { Response } from 'node-fetch'; +import { LLMModelPullResult, LLMOngoingStreamRequestMeta } from 'src/project/types'; export interface Config { host: string; @@ -65,6 +65,14 @@ export class AbortableAsyncIterator { this.abortController.abort(); } + updateProgress(progress: LLMModelPullResult) { + this.meta.progress = progress; + } + + getProgress() { + return this.meta.progress; + } + async *[Symbol.asyncIterator]() { let buffer = ''; // console.log('========= terrateor ==='); @@ -128,6 +136,7 @@ export class Ollama { method: 'GET', headers: defaultHeaders, }); + await checkOk(response); return (await response.json()) as ListResponse; } @@ -137,24 +146,26 @@ export class Ollama { method: 'GET', headers: defaultHeaders, }); + await checkOk(response); return (await response.json()) as ListResponse; } async delete(request: DeleteRequest) { const url = new URL('api/delete', this.config.host).toString(); - await fetch(url, { + const response = await fetch(url, { method: 'DELETE', body: JSON.stringify({ name: request.model, }), headers: defaultHeaders, }); + await checkOk(response); return { status: 'success' }; } async pull(request: PullRequest) { const host = new URL(this.config.host).toString(); - const url = new URL('api/pull', this.config.host).toString(); + const url = new URL('api/pull', host).toString(); const model = normalizeModelName(request.model); const abortController = new AbortController(); const response = await fetch(url, { @@ -167,7 +178,7 @@ export class Ollama { headers: defaultHeaders, signal: abortController.signal, }); - + await checkOk(response); const abortableAsyncIterator = new AbortableAsyncIterator(abortController, response.body, { model, host, @@ -182,3 +193,30 @@ export function normalizeModelName(model: string): string { } return model; } + +const checkOk = async (response: Response): Promise => { + if (response.ok) { + return; + } + let message = `Error ${response.status}: ${response.statusText}`; + let errorData: ErrorResponse | null = null; + + if (response.headers.get('content-type')?.includes('application/json')) { + try { + errorData = (await response.json()) as ErrorResponse; + message = errorData.error || message; + } catch (error) { + console.log('Failed to parse error response as JSON'); + } + } else { + try { + console.log('Getting text from response'); + const textResponse = await response.text(); + message = textResponse || message; + } catch (error) { + console.log('Failed to get text from error response'); + } + } + + throw new Error(message); +}; From 46905886685db2132e4a0f23512ca77a83a1a264 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Sun, 4 Aug 2024 23:47:26 +0800 Subject: [PATCH 13/18] fix: delete model --- apps/indexer-coordinator/src/project/project.llm.service.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index 5a250f78d..dad28b496 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -189,9 +189,11 @@ export class ProjectLLMService { async deleteModel(host: string, model: string): Promise { host = new URL(host).toString(); model = normalizeModelName(model); - const ollama = new Ollama({ host }); - await ollama.delete({ model }); + try { + const ollama = new Ollama({ host }); + await ollama.delete({ model }); + } catch (err) {} const onPulling = this.ongoingStreamedRequests.find((iterator) => { return iterator.meta.host === host && iterator.meta.model === model; }); From fd773d37fc80815d8f009d54b6a240f8b758690c Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Sun, 4 Aug 2024 23:52:21 +0800 Subject: [PATCH 14/18] build: v2.2.4-3 --- apps/indexer-coordinator/package.json | 3 +-- apps/indexer-coordinator/yarn.lock | 12 ------------ 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/apps/indexer-coordinator/package.json b/apps/indexer-coordinator/package.json index daf499cbc..0fe9818e5 100644 --- a/apps/indexer-coordinator/package.json +++ b/apps/indexer-coordinator/package.json @@ -1,6 +1,6 @@ { "name": "@subql/indexer-coordinator", - "version": "2.2.4-1", + "version": "2.2.4-3", "description": "", "author": "SubQuery", "license": "Apache-2.0", @@ -70,7 +70,6 @@ "lodash": "^4.17.21", "lru-cache": "^7.8.1", "node-fetch": "2.6.7", - "ollama": "^0.5.6", "pg": "^8.7.3", "pino": "^7.3.0", "portfinder": "^1.0.32", diff --git a/apps/indexer-coordinator/yarn.lock b/apps/indexer-coordinator/yarn.lock index c95c2095d..9cc7ec074 100644 --- a/apps/indexer-coordinator/yarn.lock +++ b/apps/indexer-coordinator/yarn.lock @@ -6784,13 +6784,6 @@ obuf@~1.1.2: resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" integrity sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg== -ollama@^0.5.6: - version "0.5.6" - resolved "https://registry.yarnpkg.com/ollama/-/ollama-0.5.6.tgz#351bd08aca7a7c3a4f3aba710dfbbefa784be1e6" - integrity sha512-4BySAMt96+OCt4emL6DE78UBCGqC7GvteM9LRCd6WwJyefn0x9w2BrcUcLm9nJ9bYpRsmkhf0Au18Q5MhsA14w== - dependencies: - whatwg-fetch "^3.6.20" - on-exit-leak-free@^0.2.0: version "0.2.0" resolved "https://registry.yarnpkg.com/on-exit-leak-free/-/on-exit-leak-free-0.2.0.tgz#b39c9e3bf7690d890f4861558b0d7b90a442d209" @@ -8821,11 +8814,6 @@ whatwg-encoding@^1.0.5: dependencies: iconv-lite "0.4.24" -whatwg-fetch@^3.6.20: - version "3.6.20" - resolved "https://registry.yarnpkg.com/whatwg-fetch/-/whatwg-fetch-3.6.20.tgz#580ce6d791facec91d37c72890995a0b48d31c70" - integrity sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg== - whatwg-mimetype@^2.3.0: version "2.3.0" resolved "https://registry.yarnpkg.com/whatwg-mimetype/-/whatwg-mimetype-2.3.0.tgz#3d4b1e0312d2079879f826aff18dbeeca5960fbf" From 156a89241bb0ade4c50a758bb3e023047874bae3 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Thu, 15 Aug 2024 12:04:33 +0800 Subject: [PATCH 15/18] feat: remove todo --- apps/indexer-coordinator/src/project/project.resolver.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index 9b930d07d..bd1193e41 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -64,9 +64,7 @@ export class ProjectResolver { ) { let project: Project; if (projectType === undefined) { - // todo: remove - // projectType = await this.projectService.getProjectType(id); - projectType = ProjectType.LLM; + projectType = await this.projectService.getProjectType(id); } switch (projectType) { case ProjectType.SUBQUERY: @@ -203,9 +201,7 @@ export class ProjectResolver { ): Promise { const manifest = new AggregatedManifest(); if (projectType === undefined) { - // todo: remove - // projectType = await this.projectService.getProjectType(projectId); - projectType = ProjectType.LLM; + projectType = await this.projectService.getProjectType(projectId); } switch (projectType) { case ProjectType.SUBQUERY: From 7bf4dd65f1eb37ae148da98e4592bc458ea90cd8 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Mon, 19 Aug 2024 11:43:13 +0800 Subject: [PATCH 16/18] fix: fix remove project --- .../src/project/project.llm.service.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index dad28b496..eeb247edf 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -133,15 +133,17 @@ export class ProjectLLMService { async removeLLMProject(id: string): Promise { const project = await this.projectService.getProject(id); if (!project) return []; - const endpoints = project.serviceEndpoints; - const host = endpoints[0]?.value; const manifest = project.manifest as LLMManifest; const targetModel = manifest?.model?.name; + + const endpoints = project.projectConfig.serviceEndpoints; + const host = endpoints[0]?.value; + if (host && targetModel) { - const ollama = new Ollama({ host }); - await ollama.delete({ model: targetModel }); + await this.deleteModel(host, targetModel); } - return this.projectRepo.remove([project]); + const res = await this.projectRepo.remove([project]); + return res; } async validate(host): Promise { @@ -193,7 +195,9 @@ export class ProjectLLMService { try { const ollama = new Ollama({ host }); await ollama.delete({ model }); - } catch (err) {} + } catch (err) { + logger.warn(`delete model error: ${err.message}`); + } const onPulling = this.ongoingStreamedRequests.find((iterator) => { return iterator.meta.host === host && iterator.meta.model === model; }); From d23e19adfe0726e4c91eab0f388e8ea64e4f8712 Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Mon, 19 Aug 2024 17:49:02 +0800 Subject: [PATCH 17/18] feat: keep model while remove project --- .../src/project/project.llm.service.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index eeb247edf..28c3a1ad1 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -133,15 +133,15 @@ export class ProjectLLMService { async removeLLMProject(id: string): Promise { const project = await this.projectService.getProject(id); if (!project) return []; - const manifest = project.manifest as LLMManifest; - const targetModel = manifest?.model?.name; + // const manifest = project.manifest as LLMManifest; + // const targetModel = manifest?.model?.name; - const endpoints = project.projectConfig.serviceEndpoints; - const host = endpoints[0]?.value; + // const endpoints = project.projectConfig.serviceEndpoints; + // const host = endpoints[0]?.value; - if (host && targetModel) { - await this.deleteModel(host, targetModel); - } + // if (host && targetModel) { + // await this.deleteModel(host, targetModel); + // } const res = await this.projectRepo.remove([project]); return res; } From 68850bfad6dced7058b13831f4f323f365d4927f Mon Sep 17 00:00:00 2001 From: luke <123917244@qq.com> Date: Tue, 20 Aug 2024 09:12:25 +0800 Subject: [PATCH 18/18] feat: add show endpoint --- apps/indexer-coordinator/src/project/project.llm.service.ts | 6 ++++++ apps/indexer-coordinator/src/project/types.ts | 2 ++ 2 files changed, 8 insertions(+) diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts index 28c3a1ad1..b0252df9c 100644 --- a/apps/indexer-coordinator/src/project/project.llm.service.ts +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -117,6 +117,12 @@ export class ProjectLLMService { this.nodeEndpoint(host, '/v1/chat/completions'), LLMEndpointAccessType[LLMEndpointType.ApiGenerateEndpoint] ), + + new SeviceEndpoint( + LLMEndpointType.AdminShowEndpoint, + this.nodeEndpoint(host, '/api/show'), + LLMEndpointAccessType[LLMEndpointType.AdminShowEndpoint] + ), ]; return await this.projectRepo.save(project); } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index e723a1ff4..bf26947c6 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -129,6 +129,7 @@ export enum SubgraphEndpointType { export enum LLMEndpointType { ApiGenerateEndpoint = 'api-generate-endpoint', + AdminShowEndpoint = 'api-show-endpoint', } export const SubgraphEndpointAccessType = { @@ -141,6 +142,7 @@ export const SubgraphEndpointAccessType = { export const LLMEndpointAccessType = { [LLMEndpointType.ApiGenerateEndpoint]: AccessType.DEFAULT, + [LLMEndpointType.AdminShowEndpoint]: AccessType.INTERNAL, }; @InputType('SubgraphPort')