Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make processingRetryTimeout configurable #3387

Merged
merged 5 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIModelsDir = flag.String("aiModelsDir", *cfg.AIModelsDir, "Set directory where AI model weights are stored")
cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "[Deprecated] Specify the base Docker image for the AI runner. Example: livepeer/ai-runner:0.0.1. Use -aiRunnerImageOverrides instead.")
cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)
cfg.AIProcessingRetryTimeout = flag.Duration("aiProcessingRetryTimeout", *cfg.AIProcessingRetryTimeout, "Timeout for retrying to initiate AI processing request")

// Live AI:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
Expand Down
22 changes: 13 additions & 9 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
TestOrchAvail *bool
AIRunnerImage *string
AIRunnerImageOverrides *string
AIProcessingRetryTimeout *time.Duration
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
Expand Down Expand Up @@ -215,6 +216,7 @@
defaultAIModels := ""
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultAIProcessingRetryTimeout := 2 * time.Second

Check warning on line 219 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L219

Added line #L219 was not covered by tests
defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second
Expand Down Expand Up @@ -320,15 +322,16 @@
TestTranscoder: &defaultTestTranscoder,

// AI:
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AIProcessingRetryTimeout: &defaultAIProcessingRetryTimeout,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,

Check warning on line 334 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L325-L334

Added lines #L325 - L334 were not covered by tests

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -513,6 +516,7 @@
if err != nil {
glog.Errorf("Error creating livepeer node: %v", err)
}
n.AIProcesssingRetryTimeout = *cfg.AIProcessingRetryTimeout

Check warning on line 519 in cmd/livepeer/starter/starter.go

View check run for this annotation

Codecov / codecov/patch

cmd/livepeer/starter/starter.go#L519

Added line #L519 was not covered by tests

if *cfg.OrchSecret != "" {
n.OrchSecret, _ = common.ReadFromFile(*cfg.OrchSecret)
Expand Down
5 changes: 3 additions & 2 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ type LivepeerNode struct {
Database *common.DB

// AI worker public fields
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AIProcesssingRetryTimeout time.Duration

// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Expand Down
2 changes: 1 addition & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"github.com/livepeer/lpms/stream"
)

const processingRetryTimeout = 2 * time.Second
const defaultTextToImageModelID = "stabilityai/sdxl-turbo"
const defaultImageToImageModelID = "stabilityai/sdxl-turbo"
const defaultImageToVideoModelID = "stabilityai/stable-video-diffusion-img2vid-xt"
Expand Down Expand Up @@ -1471,6 +1470,7 @@

var resp interface{}

processingRetryTimeout := params.node.AIProcesssingRetryTimeout

Check warning on line 1473 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1473

Added line #L1473 was not covered by tests
cctx, cancel := context.WithTimeout(ctx, processingRetryTimeout)
defer cancel()

Expand Down
Loading