Skip to content

Commit

Permalink
Fix #342
Browse files Browse the repository at this point in the history
  • Loading branch information
haf committed Jun 7, 2018
1 parent 71f4844 commit 4ad2257
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 41 deletions.
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#### 5.0.0-beta.15
* Fix #341 — template from context as well as fields
* Fix #342 — improve stackdriver perf

#### 5.0.0-beta.14
* Fix #340 — Pass value and formatted values
Expand Down
155 changes: 114 additions & 41 deletions src/targets/Logary.Targets.Stackdriver/Target_Stackdriver.fs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,17 @@ type StackdriverConf =
/// Additional user labels to be added to the messages
labels: HashMap<string,string>
/// Maximum messages to send as a batch.
maxBatchSize: uint16 }
maxBatchSize: uint16
/// Max # of inflight requests
maxInflight: uint16 }

static member create (projectId, ?logId, ?resource, ?labels, ?batchSize) =
static member create (projectId, ?logId, ?resource, ?labels, ?batchSize, ?maxInflight) =
{ projectId = projectId
logId = defaultArg logId "apps"
resource = defaultArg resource Global
labels = defaultArg labels HashMap.empty
maxBatchSize = defaultArg batchSize 50us }
maxBatchSize = defaultArg batchSize 50us
maxInflight = defaultArg maxInflight 5us }

let empty: StackdriverConf =
StackdriverConf.create "CHANGE_PROJECT_ID"
Expand Down Expand Up @@ -226,23 +229,82 @@ module internal Impl =
entry.Labels.Add labels
entry

module Bound =
type private BoundMessage =
| Acquire of replyCh: Ch<unit> * nack: Promise<unit>
| Release

type T = private { commCh: Ch<BoundMessage> }

let create (maxPar: uint32) =
let comm = Ch ()
let q = Queue<Ch<unit> * Promise<unit>>()
let iter =
Job.iterateServer 0u <| fun inflight ->
comm ^=> function
| Acquire (replyCh, nack) when inflight = maxPar ->
printfn "Acquire, Inflight=%i (maxPar), q=%i" inflight q.Count
q.Enqueue (replyCh, nack)
Alt.always inflight

| Acquire (replyCh, nack) ->
printfn "Acquire, Inflight=%i, q=%i" inflight q.Count
Alt.choosy [| replyCh *<- () ^->. inflight + 1u
nack ^->. inflight |]

| Release when inflight <> 0u && q.Count > 0 ->
printfn "Release (inflight <> 0, q <> 0), q=%i" q.Count
let replyCh, nack = q.Dequeue()
Alt.choosy [| replyCh *<- () ^->. inflight
nack ^->. inflight - 1u |]

| Release when inflight <> 0u ->
printfn "Release (inflight <> 0), q=%i" q.Count
Alt.always (inflight - 1u)

| Release ->
failwithf "ERROR Release, Inflight=%i, q=%i" inflight q.Count

iter >>-. { commCh = comm }

let acquire x =
x.commCh *<+->- fun replyCh nack -> Acquire (replyCh, nack)

let release x =
x.commCh *<- Release

let collect (messages: _) =
// Make a single pass over the array, accumulating the entries, acks and flushes.
messages |> Array.fold (fun (entries, acks, flushes) -> function
| Log (message, ack) ->
message.toLogEntry() :: entries,
ack *<= () :: acks,
flushes
| Flush (ackCh, nack) ->
entries,
acks,
ackCh *<= () :: flushes)
([], [], [])

type State =
{ logName: string
logger: LoggingServiceV2Client
resource: MonitoredResource
labels: IDictionary<string, string>
cancellation: System.Threading.CancellationTokenSource
/// used as a wrapper around the above cancellation token
callSettings: CallSettings }
callSettings: CallSettings
bound: Bound.T }

static member create (conf: StackdriverConf) =
static member create (conf: StackdriverConf) bound =
let source = new System.Threading.CancellationTokenSource()
{ logger = LoggingServiceV2Client.Create()
resource = conf.resource.toMonitoredResource conf.projectId
labels = conf.labels |> HashMap.toDictionary
logName = sprintf "projects/%s/logs/%s" conf.projectId (System.Net.WebUtility.UrlEncode conf.logId)
cancellation = source
callSettings = CallSettings.FromCancellationToken(source.Token) }
callSettings = CallSettings.FromCancellationToken(source.Token)
bound = bound}

let writeBatch (state: State) (entries: LogEntry list): Job<unit> =
Job.Scheduler.isolate (fun () ->
Expand All @@ -260,45 +322,56 @@ module internal Impl =
>> setField "projectId" conf.projectId
>> setField "logName" conf.logId)

let rec loop (state: State): Job<unit> =
Alt.choose [
// either shutdown, or
api.shutdownCh ^=> fun ack ->
logger.verbose (eventX "Shutting down Stackdriver target.")
state.cancellation.Cancel()
ack *<= () :> Job<_>

RingBuffer.takeBatch conf.maxBatchSize api.requests ^=> fun messages ->
let entries, acks, flushes =
// Make a single pass over the array, accumulating the entries, acks and flushes.
messages |> Array.fold (fun (entries, acks, flushes) -> function
| Log (message, ack) ->
message.toLogEntry() :: entries,
ack *<= () :: acks,
flushes
| Flush (ackCh, nack) ->
entries,
acks,
ackCh *<= () :: flushes)
([], [], [])

job {
do logger.verbose (eventX "Writing {count} messages." >> setField "count" entries.Length)
do! writeBatch state entries
do logger.verbose (eventX "Acking messages.")
do! Job.conIgnore acks
do! Job.conIgnore flushes
return! loop state
}

] :> Job<_>

loop (State.create conf)
let shutdown (state: State) =
api.shutdownCh ^=> fun ack ->
logger.verbose (eventX "Shutting down Stackdriver target.")
state.cancellation.Cancel()
ack *<= () :> Job<_>

let takeAndWriteBatch (state: State): Alt<State> =
RingBuffer.takeBatch conf.maxBatchSize api.requests ^=> fun messages ->
let entries, acks, flushes = collect messages

let doWrite =
logger.timeJob (
writeBatch state entries,
measurement="writeBatch",
transform=(setValue "Writing {count} messages." >> setField "count" entries.Length))

let doAcks =
logger.verboseWithBP (eventX "Acking messages.")
>>=. Job.conIgnore acks

let onDone =
Bound.release state.bound

Job.start (doWrite >>=. doAcks >>=. Job.conIgnore flushes >>=. onDone)
>>-. state

let rec create (): Job<unit> =
Bound.create (uint32 conf.maxInflight)
>>- State.create conf
>>= loop

and loop (state: State): Job<unit> =
let alternative =
shutdown state
<|> Bound.acquire state.bound ^=> fun () ->
Alt.choose [
takeAndWriteBatch state ^=> loop
timeOutMillis 10000 ^=> fun () ->
Bound.release state.bound ^=> fun () ->
loop state
]
alternative :> Job<_>

create ()

/// Create a new StackDriver target
[<CompiledName "Create">]
let create conf name =
TargetConf.createSimple (Impl.loop conf) name
{ TargetConf.createSimple (Impl.loop conf) name with
bufferSize = 2048us }

type Builder(conf, callParent: Target.ParentCallback<Builder>) =

Expand Down
18 changes: 18 additions & 0 deletions src/tests/Logary.Targets.Stackdriver.Tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ let target =

do! flush target
}

testCaseJob "10k messages" <| job {
let targetConf = Stackdriver.create stackdriver.Value "logary-stackdriver"
let! target = Target.create ri targetConf

let ex = raisedExn "boohoo"
for i in 1..10_000 do
let message =
event LogLevel.Info "Stress test {blah}"
|> setField "blah" 12345
|> setContext "zone" "foobar"
|> addExn ex
let! inBuffer = Target.log target message
do inBuffer |> function Ok _ -> ()
| Result.Error e -> failtestf "Failure placing in buffer %A" e

do! flush target
}
]

[<EntryPoint>]
Expand Down

0 comments on commit 4ad2257

Please sign in to comment.