diff --git a/src/Adapter/AdapterAbstract.php b/src/Adapter/AdapterAbstract.php index d1579a5..39ad388 100644 --- a/src/Adapter/AdapterAbstract.php +++ b/src/Adapter/AdapterAbstract.php @@ -27,35 +27,43 @@ protected function messageToLineProtocol(array $message) return; } - if (!array_key_exists("tags", $message)) { - $message["tags"] = []; - } - + $message = $this->prepareMessageSection($message); $message["tags"] = array_replace_recursive($this->getOptions()->getTags(), $message["tags"]); - $unixepoch = (int)(microtime(true) * 1e9); - if (array_key_exists("time", $message)) { - $dt = new DateTime($message["time"]); - $unixepoch = (int)($dt->format("U") * 1e9); - } - $lines = []; foreach ($message["points"] as $point) { - $tags = $message["tags"]; - if (array_key_exists("tags", $point)) { - $tags = array_replace_recursive($tags, $point["tags"]); - } + $point = $this->prepareMessageSection($point, $message["time"]); + $tags = array_replace_recursive($message["tags"], $point["tags"]); $tagLine = $this->tagsToString($tags); $lines[] = sprintf( - "%s%s %s %d", $point["measurement"], $tagLine, $this->pointsToString($point["fields"]), $unixepoch + "%s%s %s %d", $point["measurement"], $tagLine, $this->pointsToString($point["fields"]), $point["time"] ); } return implode("\n", $lines); } + private function prepareMessageSection(array $message, $unixepoch = false) + { + if (!array_key_exists("tags", $message)) { + $message["tags"] = []; + } + + if (!$unixepoch) { + $unixepoch = (int)(microtime(true) * 1e9); + } + + if (array_key_exists("time", $message)) { + $dt = new DateTime($message["time"]); + $unixepoch = (int)($dt->format("U") * 1e9); + } + $message["time"] = $unixepoch; + + return $message; + } + protected function tagsToString(array $tags) { $tagLine = ""; diff --git a/tests/unit/Adapter/GuzzleAdapterTest.php b/tests/unit/Adapter/GuzzleAdapterTest.php index bad8675..3e531f3 100644 --- a/tests/unit/Adapter/GuzzleAdapterTest.php +++ b/tests/unit/Adapter/GuzzleAdapterTest.php @@ -66,6 +66,7 @@ public function getQueryEndpoints() ]; } + public function testMergeWithDefaultOptions() { $options = new Options(); @@ -79,154 +80,214 @@ public function testMergeWithDefaultOptions() ], "body" => null, ])->shouldBeCalledTimes(1); - $adapter = new InfluxHttpAdapter($httpClient->reveal(), $options); $adapter->send([]); } - public function testAdapterPrepareJsonDataCorrectly() + /** + * @dataProvider getMessages + */ + public function testMessageComposition($options, $send, $regexp) { $guzzleHttp = $this->prophesize("GuzzleHttp\Client"); - $guzzleHttp->post("http://localhost:8086/write", [ - "auth" => ["root", "root"], - "query" => [ - "db" => "db", - "retentionPolicy" => "default", - ], - "body" => 'tcp.test mark="element" 1257894000000000000', - ])->shouldBeCalledTimes(1); - $options = (new Options())->setDatabase("db"); + $guzzleHttp->post("http://localhost:8086/write", Argument::that(function($val) use ($regexp) { + $body = $val["body"]; + $this->assertRegExp($regexp, $body); + return true; + }))->shouldBeCalledTimes(1); $adapter = new InfluxHttpAdapter($guzzleHttp->reveal(), $options); - $adapter->send([ - "time" => "2009-11-10T23:00:00Z", - "points" => [ - [ - "measurement" => "tcp.test", - "fields" => [ - "mark" => "element" - ] - ] - ] - ]); + $adapter->send($send); } - public function testEmptyTagsFieldIsRemoved() + public function getMessages() { - $options = new Options(); - $options->setDatabase("db"); - $httpClient = $this->prophesize("GuzzleHttp\\Client"); - $httpClient->post(Argument::Any(), [ - "auth" => ["root", "root"], - "query" => [ - "db" => "db", - "retentionPolicy" => "default", + return [ + [ + (new Options())->setDatabase("db"), + [ + "time" => "2009-11-10T23:00:00Z", + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ] + ] + ], + '/tcp.test mark="element" 1257894000000000000/i' ], - "body" => 'tcp.test mark="element" 1257894000000000000', - ])->shouldBeCalledTimes(1); - - $adapter = new InfluxHttpAdapter($httpClient->reveal(), $options); - $adapter->send([ - "time" => "2009-11-10T23:00:00Z", - "points" => [ + [ + (new Options())->setDatabase("db"), [ - "measurement" => "tcp.test", - "fields" => [ - "mark" => "element" + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ], + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element2" + ] + ], ] - ] - ] - ]); - } - - public function testGlobalTagsAreInPlace() - { - $options = new Options(); - $options->setDatabase("db"); - $options->setTags([ - "dc" => "us-west", - ]); - $httpClient = $this->prophesize("GuzzleHttp\\Client"); - $httpClient->post(Argument::Any(), [ - "auth" => ["root", "root"], - "query" => [ - "db" => "db", - "retentionPolicy" => "default", + ], + '/tcp.test mark="element" \d+\ntcp.test mark="element2" \d+/i' ], - "body" => 'tcp.test,dc=us-west mark="element" 1257894000000000000', - ])->shouldBeCalledTimes(1); - - $adapter = new InfluxHttpAdapter($httpClient->reveal(), $options); - $adapter->send([ - "time" => "2009-11-10T23:00:00Z", - "points" => [ + [ + (new Options())->setDatabase("db"), [ - "measurement" => "tcp.test", - "fields" => [ - "mark" => "element" + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ] ] - ] - ] - ]); - } - - public function testTagsFieldIsMergedWithGlobalTags() - { - $options = new Options(); - $options->setDatabase("db"); - $options->setTags([ - "dc" => "us-west", - ]); - $httpClient = $this->prophesize("GuzzleHttp\\Client"); - $httpClient->post(Argument::Any(), [ - "auth" => ["root", "root"], - "query" => [ - "db" => "db", - "retentionPolicy" => "default", + ], + '/tcp.test mark="element" \d+/i' ], - "body" => 'tcp.test,dc=us-west,region=us mark="element" 1257894000000000000', - ])->shouldBeCalledTimes(1); - - $adapter = new InfluxHttpAdapter($httpClient->reveal(), $options); - $adapter->send([ - "time" => "2009-11-10T23:00:00Z", - "tags" => ["region" => "us"], - "points" => [ + [ + (new Options())->setDatabase("db"), [ - "measurement" => "tcp.test", - "fields" => [ - "mark" => "element" + "time" => "2009-11-10T23:00:00Z", + "points" => [ + [ + "measurement" => "tcp.test", + "time" => "2009-11-10T23:00:00Z", + "fields" => [ + "mark" => "element" + ] + ] ] - ] - ] - ]); - } - - public function testAdapterForceIntegersCorrectly() - { - $guzzleHttp = $this->prophesize("GuzzleHttp\Client"); - $guzzleHttp->post("http://localhost:8086/write", [ - "auth" => ["root", "root"], - "query" => [ - "db" => "db", - "retentionPolicy" => "default", + ], + '/tcp.test mark="element" 1257894000000000000/i' ], - "body" => 'tcp.test mark="element",value=12i 1257894000000000000', - ])->shouldBeCalledTimes(1); - $options = (new Options())->setDatabase("db")->setForceIntegers(true); - $adapter = new InfluxHttpAdapter($guzzleHttp->reveal(), $options); - - $adapter->send([ - "time" => "2009-11-10T23:00:00Z", - "points" => [ + [ + (new Options())->setDatabase("db"), + [ + "time" => "2009-11-10T23:00:00Z", + "points" => [ + [ + "measurement" => "tcp.test", + "time" => "2009-11-10T23:00:00Z", + "fields" => [ + "mark" => "element" + ] + ], + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element2" + ] + ], + ] + ], + '/tcp.test mark="element" 1257894000000000000\ntcp.test mark="element2" 1257894000000000000$/i' + ], + [ + (new Options())->setDatabase("db"), [ - "measurement" => "tcp.test", - "fields" => [ - "mark" => "element", - "value" => 12, + "points" => [ + [ + "measurement" => "tcp.test", + "time" => "2009-11-10T23:00:00Z", + "fields" => [ + "mark" => "element", + ] + ] ] - ] - ] - ]); + ], + '/tcp.test mark="element" 1257894000000000000$/i' + ], + [ + (new Options())->setDatabase("db")->setTags(["dc" => "us-west"]), + [ + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ] + ] + ], + '/tcp.test,dc=us-west mark="element" \d+/i' + ], + [ + (new Options())->setDatabase("db")->setTags(["dc" => "us-west"]), + [ + "tags" => ["region" => "us"], + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ] + ] + ], + '/tcp.test,dc=us-west,region=us mark="element" \d+/i' + ], + [ + (new Options())->setDatabase("db")->setTags(["dc" => "us-west"]), + [ + "tags" => ["region" => "us"], + "points" => [ + [ + "measurement" => "tcp.test", + "tags" => [ + "tt" => "fi", + ], + "fields" => [ + "mark" => "element" + ] + ] + ] + ], + '/tcp.test,dc=us-west,region=us,tt=fi mark="element" \d+$/i' + ], + [ + (new Options())->setDatabase("db")->setTags(["dc" => "us-west"]), + [ + "tags" => ["region" => "us"], + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element" + ] + ], + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element2" + ] + ] + ] + ], + '/tcp.test,dc=us-west,region=us mark="element" \d+\ntcp.test,dc=us-west,region=us mark="element2" \d+$/i' + ], + [ + (new Options())->setDatabase("db")->setForceIntegers(true), + [ + "points" => [ + [ + "measurement" => "tcp.test", + "fields" => [ + "mark" => "element", + "value" => 12, + ] + ] + ] + ], + '/tcp.test mark="element",value=12i \d+/i' + ], + ]; } }