Skip to content

Commit

Permalink
add example of transformation plugin for #1286
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Feb 5, 2024
1 parent 2edc3e0 commit 0a7a620
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 15 deletions.
20 changes: 8 additions & 12 deletions otoroshi/app/gateway/websockets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import otoroshi.events._
import otoroshi.models._
import otoroshi.next.models.{NgContextualPlugins, NgPluginInstance, NgRoute}
import otoroshi.next.plugins.RejectStrategy
import otoroshi.next.plugins.api.{NgAccess, NgPluginWrapper, NgWebsocketError, NgWebsocketPluginContext, NgWebsocketResponse, NgWebsocketValidatorPlugin, WebsocketMessage}
import otoroshi.next.plugins.api.{NgAccess, NgPluginWrapper, NgWebsocketError, NgWebsocketPlugin, NgWebsocketPluginContext, NgWebsocketResponse, NgWebsocketValidatorPlugin, WebsocketMessage}
import otoroshi.next.proxy.NgProxyEngineError
import otoroshi.next.proxy.NgProxyEngineError.NgResultProxyEngineError
import otoroshi.next.utils.FEither
Expand Down Expand Up @@ -922,23 +922,19 @@ class WebSocketProxyActor(

class WebsocketEngine(route: NgRoute, ctxPlugins: NgContextualPlugins, rawRequest: RequestHeader, target: Target) {

private def getValidators()(f: NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin] => Boolean): Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]] = {
private def getPlugins()(f: NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin] => Boolean): Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]] = {
ctxPlugins.websocketPlugins
.collect {
case p if p.plugin.isInstanceOf[NgWebsocketValidatorPlugin] =>
NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin](instance = p.instance, plugin = p.plugin.asInstanceOf[NgWebsocketValidatorPlugin])
}
.filter(f)
}

private def handle[A](validators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]],
private def handle[A](validators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]],
data: WebsocketMessage,
applyResponseFilter: Boolean = false)(closeConnection: NgWebsocketResponse => Unit)
(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {

val promise = Promise[Either[NgWebsocketError, WebsocketMessage]]()

def next(current: WebsocketMessage, plugins: Seq[NgPluginWrapper[NgWebsocketValidatorPlugin]]): Unit = {
def next(current: WebsocketMessage, plugins: Seq[NgPluginWrapper[NgWebsocketPlugin]]): Unit = {
plugins.headOption match {
case None => promise.trySuccess(Right(current))
case Some(wrapper) =>
Expand Down Expand Up @@ -982,23 +978,23 @@ class WebsocketEngine(route: NgRoute, ctxPlugins: NgContextualPlugins, rawReques
promise.future
}

def handleRequest[A](data: play.api.http.websocket.Message)(closeConnection: NgWebsocketResponse => Unit)
def handleRequest(data: play.api.http.websocket.Message)(closeConnection: NgWebsocketResponse => Unit)
(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {
if (ctxPlugins.hasNoWebsocketPlugins) {
val r: Either[NgWebsocketError, WebsocketMessage] = Right[NgWebsocketError, WebsocketMessage](WebsocketMessage.PlayMessage(data))
r.vfuture
} else {
val requestValidators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]] = getValidators()(_.plugin.onRequestFlow)
val requestValidators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]] = getPlugins()(_.plugin.onRequestFlow)
handle(requestValidators, WebsocketMessage.PlayMessage(data))(closeConnection)
}
}

def handleResponse[A](data: akka.http.scaladsl.model.ws.Message)(closeConnection: NgWebsocketResponse => Unit)
def handleResponse(data: akka.http.scaladsl.model.ws.Message)(closeConnection: NgWebsocketResponse => Unit)
(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {
if (ctxPlugins.hasNoWebsocketPlugins) {
WebsocketMessage.AkkaMessage(data).rightf[NgWebsocketError]
} else {
val responseValidators = getValidators()(_.plugin.onResponseFlow)
val responseValidators = getPlugins()(_.plugin.onResponseFlow)
handle(responseValidators, WebsocketMessage.AkkaMessage(data), applyResponseFilter = true)(closeConnection)
}
}
Expand Down
5 changes: 2 additions & 3 deletions otoroshi/app/next/plugins/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1387,15 +1387,14 @@ object NgWebsocketResponse {
}

trait NgWebsocketPlugin extends NgNamedPlugin {
def rejectStrategy(ctx: NgWebsocketPluginContext): RejectStrategy = RejectStrategy.Drop
def onRequestFlow: Boolean = false
def onResponseFlow: Boolean = false
def onRequestMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = message.rightf
def onResponseMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = message.rightf
}

trait NgWebsocketValidatorPlugin extends NgWebsocketPlugin {
def rejectStrategy(ctx: NgWebsocketPluginContext): RejectStrategy
}
trait NgWebsocketValidatorPlugin extends NgWebsocketPlugin {}

case class NgWebsocketError(statusCode: Option[Int] = None, reason: Option[String] = None, rejectStrategy: Option[RejectStrategy])
object NgWebsocketError {
Expand Down
86 changes: 86 additions & 0 deletions otoroshi/app/next/plugins/websocket.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package otoroshi.next.plugins

import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.arakelian.jq.{ImmutableJqLibrary, ImmutableJqRequest}
import com.networknt.schema.SpecVersion.VersionFlag
import com.networknt.schema.{InputFormat, JsonSchemaFactory, PathType, SchemaValidatorsConfig}
import otoroshi.env.Env
import otoroshi.gateway.Errors
import otoroshi.next.plugins.api._
import otoroshi.utils.JsonPathValidator
import otoroshi.utils.syntax.implicits._
import play.api.Logger
import play.api.http.websocket.CloseCodes
import play.api.libs.json._
import play.api.mvc.Results

import java.nio.charset.StandardCharsets
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.asScalaBufferConverter
import scala.util._

sealed trait RejectStrategy {
Expand Down Expand Up @@ -354,3 +358,85 @@ class WebsocketSizeValidator extends NgWebsocketValidatorPlugin {
internalCanAccess(ctx, message, config.upstreamMaxPayload, reason = "upstream payload limit exceeded")
}
}

case class JqWebsocketMessageTransformerConfig(requestFilter: String = ".", responseFilter: String = ".") extends NgPluginConfig {
override def json: JsValue = JqWebsocketMessageTransformerConfig.format.writes(this)
}
object JqWebsocketMessageTransformerConfig {
val format = new Format[JqWebsocketMessageTransformerConfig] {
override def reads(json: JsValue): JsResult[JqWebsocketMessageTransformerConfig] = Try {
JqWebsocketMessageTransformerConfig(
requestFilter = json.select("request_filter").asOpt[String].getOrElse("."),
responseFilter = json.select("response_filter").asOpt[String].getOrElse("."),
)
} match {
case Failure(e) => JsError(e.getMessage)
case Success(s) => JsSuccess(s)
}
override def writes(o: JqWebsocketMessageTransformerConfig): JsValue = Json.obj(
"request_filter" -> o.requestFilter,
"response_filter" -> o.responseFilter,
)
}
}

class JqWebsocketMessageTransformer extends NgWebsocketPlugin {

private val library = ImmutableJqLibrary.of()
private val logger = Logger("otoroshi-plugins-jq-websocket")

override def multiInstance: Boolean = true
override def defaultConfigObject: Option[NgPluginConfig] = Some(JqWebsocketMessageTransformerConfig())
override def core: Boolean = false
override def name: String = "Websocket JQ transformer"
override def description: Option[String] = "Transform messages JSON content using JQ filters".some
override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand
override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Websocket)
override def steps: Seq[NgStep] = Seq(NgStep.TransformResponse)

override def onRequestFlow: Boolean = true
override def onResponseFlow: Boolean = true

override def onRequestMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {
val config = ctx.cachedConfig(internalName)(JqWebsocketMessageTransformerConfig.format).getOrElse(JqWebsocketMessageTransformerConfig())
onMessage(ctx, message, config.requestFilter)
}

override def onResponseMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {
val config = ctx.cachedConfig(internalName)(JqWebsocketMessageTransformerConfig.format).getOrElse(JqWebsocketMessageTransformerConfig())
onMessage(ctx, message, config.responseFilter)
}

def onMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage, filter: String)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = {
implicit val mat = env.otoroshiMaterializer
if (message.isText) {
message.str().flatMap { bodyStr =>
Try(Json.parse(bodyStr)) match {
case Failure(e) => Right(message).vfuture
case Success(_) => {
val request = ImmutableJqRequest
.builder()
.lib(library)
.input(bodyStr)
.putArgJson("context", ctx.json.stringify)
.filter(filter)
.build()
val response = request.execute()
if (response.hasErrors) {
logger.error(
s"error while transforming response body:\n${response.getErrors.asScala.mkString("\n")}"
)
val errors = JsArray(response.getErrors.asScala.map(err => JsString(err)))
Right(WebsocketMessage.PlayMessage(play.api.http.websocket.TextMessage(errors.stringify))).vfuture
} else {
val rawBody = response.getOutput
Right(WebsocketMessage.PlayMessage(play.api.http.websocket.TextMessage(rawBody))).vfuture
}
}
}
}
} else {
Right(message).vfuture
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export default {
id: 'cp:otoroshi.next.plugins.JqWebsocketMessageTransformer',
config_schema: {
request_filter: {
label: 'Request JQ filter',
type: 'string',
},
response_filter: {
label: 'Response JQ filter',
type: 'string',
},
},
config_flow: ['request_filter', 'response_filter'],
};
2 changes: 2 additions & 0 deletions otoroshi/javascript/src/forms/ng_plugins/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ import WebsocketContentValidatorIn from './WebsocketContentValidatorIn';
import WebsocketJsonFormatValidator from './WebsocketJsonFormatValidator';
import WebsocketSizeValidator from './WebsocketSizeValidator';
import WebsocketTypeValidator from './WebsocketTypeValidator';
import JqWebsocketMessageTransformer from './JqWebsocketMessageTransformer';
import ZipFileBackend from './ZipFileBackend';

export const Backend = NgBackend;
Expand Down Expand Up @@ -286,6 +287,7 @@ const pluginsArray = [
WebsocketJsonFormatValidator,
WebsocketTypeValidator,
WebsocketSizeValidator,
JqWebsocketMessageTransformer,
ZipFileBackend,
];

Expand Down

0 comments on commit 0a7a620

Please sign in to comment.