diff --git a/otoroshi/app/gateway/websockets.scala b/otoroshi/app/gateway/websockets.scala index 1fbb9f075..4e8c243f7 100644 --- a/otoroshi/app/gateway/websockets.scala +++ b/otoroshi/app/gateway/websockets.scala @@ -18,7 +18,7 @@ import otoroshi.events._ import otoroshi.models._ import otoroshi.next.models.{NgContextualPlugins, NgPluginInstance, NgRoute} import otoroshi.next.plugins.RejectStrategy -import otoroshi.next.plugins.api.{NgAccess, NgPluginWrapper, NgWebsocketError, NgWebsocketPluginContext, NgWebsocketResponse, NgWebsocketValidatorPlugin, WebsocketMessage} +import otoroshi.next.plugins.api.{NgAccess, NgPluginWrapper, NgWebsocketError, NgWebsocketPlugin, NgWebsocketPluginContext, NgWebsocketResponse, NgWebsocketValidatorPlugin, WebsocketMessage} import otoroshi.next.proxy.NgProxyEngineError import otoroshi.next.proxy.NgProxyEngineError.NgResultProxyEngineError import otoroshi.next.utils.FEither @@ -922,23 +922,19 @@ class WebSocketProxyActor( class WebsocketEngine(route: NgRoute, ctxPlugins: NgContextualPlugins, rawRequest: RequestHeader, target: Target) { - private def getValidators()(f: NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin] => Boolean): Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]] = { + private def getPlugins()(f: NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin] => Boolean): Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]] = { ctxPlugins.websocketPlugins - .collect { - case p if p.plugin.isInstanceOf[NgWebsocketValidatorPlugin] => - NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin](instance = p.instance, plugin = p.plugin.asInstanceOf[NgWebsocketValidatorPlugin]) - } .filter(f) } - private def handle[A](validators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]], + private def handle[A](validators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]], data: WebsocketMessage, applyResponseFilter: Boolean = false)(closeConnection: NgWebsocketResponse => Unit) (implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { val promise = Promise[Either[NgWebsocketError, WebsocketMessage]]() - def next(current: WebsocketMessage, plugins: Seq[NgPluginWrapper[NgWebsocketValidatorPlugin]]): Unit = { + def next(current: WebsocketMessage, plugins: Seq[NgPluginWrapper[NgWebsocketPlugin]]): Unit = { plugins.headOption match { case None => promise.trySuccess(Right(current)) case Some(wrapper) => @@ -982,23 +978,23 @@ class WebsocketEngine(route: NgRoute, ctxPlugins: NgContextualPlugins, rawReques promise.future } - def handleRequest[A](data: play.api.http.websocket.Message)(closeConnection: NgWebsocketResponse => Unit) + def handleRequest(data: play.api.http.websocket.Message)(closeConnection: NgWebsocketResponse => Unit) (implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { if (ctxPlugins.hasNoWebsocketPlugins) { val r: Either[NgWebsocketError, WebsocketMessage] = Right[NgWebsocketError, WebsocketMessage](WebsocketMessage.PlayMessage(data)) r.vfuture } else { - val requestValidators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketValidatorPlugin]] = getValidators()(_.plugin.onRequestFlow) + val requestValidators: Seq[NgPluginWrapper.NgSimplePluginWrapper[NgWebsocketPlugin]] = getPlugins()(_.plugin.onRequestFlow) handle(requestValidators, WebsocketMessage.PlayMessage(data))(closeConnection) } } - def handleResponse[A](data: akka.http.scaladsl.model.ws.Message)(closeConnection: NgWebsocketResponse => Unit) + def handleResponse(data: akka.http.scaladsl.model.ws.Message)(closeConnection: NgWebsocketResponse => Unit) (implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { if (ctxPlugins.hasNoWebsocketPlugins) { WebsocketMessage.AkkaMessage(data).rightf[NgWebsocketError] } else { - val responseValidators = getValidators()(_.plugin.onResponseFlow) + val responseValidators = getPlugins()(_.plugin.onResponseFlow) handle(responseValidators, WebsocketMessage.AkkaMessage(data), applyResponseFilter = true)(closeConnection) } } diff --git a/otoroshi/app/next/plugins/api.scala b/otoroshi/app/next/plugins/api.scala index d5378a409..b1f94dd1f 100644 --- a/otoroshi/app/next/plugins/api.scala +++ b/otoroshi/app/next/plugins/api.scala @@ -1387,15 +1387,14 @@ object NgWebsocketResponse { } trait NgWebsocketPlugin extends NgNamedPlugin { + def rejectStrategy(ctx: NgWebsocketPluginContext): RejectStrategy = RejectStrategy.Drop def onRequestFlow: Boolean = false def onResponseFlow: Boolean = false def onRequestMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = message.rightf def onResponseMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = message.rightf } -trait NgWebsocketValidatorPlugin extends NgWebsocketPlugin { - def rejectStrategy(ctx: NgWebsocketPluginContext): RejectStrategy -} +trait NgWebsocketValidatorPlugin extends NgWebsocketPlugin {} case class NgWebsocketError(statusCode: Option[Int] = None, reason: Option[String] = None, rejectStrategy: Option[RejectStrategy]) object NgWebsocketError { diff --git a/otoroshi/app/next/plugins/websocket.scala b/otoroshi/app/next/plugins/websocket.scala index f51513b5b..747f90b86 100644 --- a/otoroshi/app/next/plugins/websocket.scala +++ b/otoroshi/app/next/plugins/websocket.scala @@ -1,6 +1,8 @@ package otoroshi.next.plugins import akka.stream.Materializer +import akka.stream.scaladsl.Source +import com.arakelian.jq.{ImmutableJqLibrary, ImmutableJqRequest} import com.networknt.schema.SpecVersion.VersionFlag import com.networknt.schema.{InputFormat, JsonSchemaFactory, PathType, SchemaValidatorsConfig} import otoroshi.env.Env @@ -8,12 +10,14 @@ import otoroshi.gateway.Errors import otoroshi.next.plugins.api._ import otoroshi.utils.JsonPathValidator import otoroshi.utils.syntax.implicits._ +import play.api.Logger import play.api.http.websocket.CloseCodes import play.api.libs.json._ import play.api.mvc.Results import java.nio.charset.StandardCharsets import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.asScalaBufferConverter import scala.util._ sealed trait RejectStrategy { @@ -354,3 +358,85 @@ class WebsocketSizeValidator extends NgWebsocketValidatorPlugin { internalCanAccess(ctx, message, config.upstreamMaxPayload, reason = "upstream payload limit exceeded") } } + +case class JqWebsocketMessageTransformerConfig(requestFilter: String = ".", responseFilter: String = ".") extends NgPluginConfig { + override def json: JsValue = JqWebsocketMessageTransformerConfig.format.writes(this) +} +object JqWebsocketMessageTransformerConfig { + val format = new Format[JqWebsocketMessageTransformerConfig] { + override def reads(json: JsValue): JsResult[JqWebsocketMessageTransformerConfig] = Try { + JqWebsocketMessageTransformerConfig( + requestFilter = json.select("request_filter").asOpt[String].getOrElse("."), + responseFilter = json.select("response_filter").asOpt[String].getOrElse("."), + ) + } match { + case Failure(e) => JsError(e.getMessage) + case Success(s) => JsSuccess(s) + } + override def writes(o: JqWebsocketMessageTransformerConfig): JsValue = Json.obj( + "request_filter" -> o.requestFilter, + "response_filter" -> o.responseFilter, + ) + } +} + +class JqWebsocketMessageTransformer extends NgWebsocketPlugin { + + private val library = ImmutableJqLibrary.of() + private val logger = Logger("otoroshi-plugins-jq-websocket") + + override def multiInstance: Boolean = true + override def defaultConfigObject: Option[NgPluginConfig] = Some(JqWebsocketMessageTransformerConfig()) + override def core: Boolean = false + override def name: String = "Websocket JQ transformer" + override def description: Option[String] = "Transform messages JSON content using JQ filters".some + override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand + override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Websocket) + override def steps: Seq[NgStep] = Seq(NgStep.TransformResponse) + + override def onRequestFlow: Boolean = true + override def onResponseFlow: Boolean = true + + override def onRequestMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { + val config = ctx.cachedConfig(internalName)(JqWebsocketMessageTransformerConfig.format).getOrElse(JqWebsocketMessageTransformerConfig()) + onMessage(ctx, message, config.requestFilter) + } + + override def onResponseMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { + val config = ctx.cachedConfig(internalName)(JqWebsocketMessageTransformerConfig.format).getOrElse(JqWebsocketMessageTransformerConfig()) + onMessage(ctx, message, config.responseFilter) + } + + def onMessage(ctx: NgWebsocketPluginContext, message: WebsocketMessage, filter: String)(implicit env: Env, ec: ExecutionContext): Future[Either[NgWebsocketError, WebsocketMessage]] = { + implicit val mat = env.otoroshiMaterializer + if (message.isText) { + message.str().flatMap { bodyStr => + Try(Json.parse(bodyStr)) match { + case Failure(e) => Right(message).vfuture + case Success(_) => { + val request = ImmutableJqRequest + .builder() + .lib(library) + .input(bodyStr) + .putArgJson("context", ctx.json.stringify) + .filter(filter) + .build() + val response = request.execute() + if (response.hasErrors) { + logger.error( + s"error while transforming response body:\n${response.getErrors.asScala.mkString("\n")}" + ) + val errors = JsArray(response.getErrors.asScala.map(err => JsString(err))) + Right(WebsocketMessage.PlayMessage(play.api.http.websocket.TextMessage(errors.stringify))).vfuture + } else { + val rawBody = response.getOutput + Right(WebsocketMessage.PlayMessage(play.api.http.websocket.TextMessage(rawBody))).vfuture + } + } + } + } + } else { + Right(message).vfuture + } + } +} diff --git a/otoroshi/javascript/src/forms/ng_plugins/JqWebsocketMessageTransformer.js b/otoroshi/javascript/src/forms/ng_plugins/JqWebsocketMessageTransformer.js new file mode 100644 index 000000000..f67c07749 --- /dev/null +++ b/otoroshi/javascript/src/forms/ng_plugins/JqWebsocketMessageTransformer.js @@ -0,0 +1,14 @@ +export default { + id: 'cp:otoroshi.next.plugins.JqWebsocketMessageTransformer', + config_schema: { + request_filter: { + label: 'Request JQ filter', + type: 'string', + }, + response_filter: { + label: 'Response JQ filter', + type: 'string', + }, + }, + config_flow: ['request_filter', 'response_filter'], +}; diff --git a/otoroshi/javascript/src/forms/ng_plugins/index.js b/otoroshi/javascript/src/forms/ng_plugins/index.js index 75a35f886..f48cee520 100644 --- a/otoroshi/javascript/src/forms/ng_plugins/index.js +++ b/otoroshi/javascript/src/forms/ng_plugins/index.js @@ -140,6 +140,7 @@ import WebsocketContentValidatorIn from './WebsocketContentValidatorIn'; import WebsocketJsonFormatValidator from './WebsocketJsonFormatValidator'; import WebsocketSizeValidator from './WebsocketSizeValidator'; import WebsocketTypeValidator from './WebsocketTypeValidator'; +import JqWebsocketMessageTransformer from './JqWebsocketMessageTransformer'; import ZipFileBackend from './ZipFileBackend'; export const Backend = NgBackend; @@ -286,6 +287,7 @@ const pluginsArray = [ WebsocketJsonFormatValidator, WebsocketTypeValidator, WebsocketSizeValidator, + JqWebsocketMessageTransformer, ZipFileBackend, ];