NAME AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database VERSION version 0.01 SYNOPSIS use EV; use AnyEvent; use AnyEvent::Socket; use AnyEvent::Handle; use AnyEvent::InfluxDB; my $db = AnyEvent::InfluxDB->new( server => 'http://localhost:8086', username => 'admin', password => 'password', ); my $hdl; tcp_server undef, 8888, sub { my ($fh, $host, $port) = @_; $hdl = AnyEvent::Handle->new( fh => $fh, ); $hdl->push_read( line => sub { my (undef, $line) = @_; $db->write( database => 'mydb', data => $line, on_success => sub { print "$line written\n"; }, on_error => sub { print "$line error: @_\n"; }, ); $hdl->on_drain( sub { $hdl->fh->close; undef $hdl; } ); }, ); }; EV::run; DESCRIPTION Asynchronous client library for InfluxDB time-series database v0.9.2 . METHODS new my $db = AnyEvent::InfluxDB->new( server => 'http://localhost:8086', username => 'admin', password => 'password', ); Returns object representing given server "server" connected using optionally provided username "username" and password "password". Default value of "server" is "http://localhost:8086". If the server protocol is "https" then by default no validation of remote host certificate is performed. This can be changed by setting "ssl_options" parameter with any options accepted by AnyEvent::TLS. my $db = AnyEvent::InfluxDB->new( server => 'https://localhost:8086', username => 'admin', password => 'password', ssl_options => { verify => 1, verify_peername => 'https', ca_file => '/path/to/cacert.pem', } ); As an debugging aid the "on_request" code reference may also be provided. It will be executed before each request with the method name, url and POST data if set. my $db = AnyEvent::InfluxDB->new( on_request => sub { my ($method, $url, $post_data) = @_; print "$method $url\n"; print "$post_data\n" if $post_data; } ); Database Management create_database $cv = AE::cv; $db->create_database( database => "mydb", on_success => $cv, on_error => sub { $cv->croak("Failed to create database: @_"); } ); $cv->recv; Creates specified by "database" argument database. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_database $cv = AE::cv; $db->drop_database( database => "mydb", on_success => $cv, on_error => sub { $cv->croak("Failed to drop database: @_"); } ); $cv->recv; Drops specified by "database" argument database. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_databases $cv = AE::cv; $db->show_databases( on_success => $cv, on_error => sub { $cv->croak("Failed to list databases: @_"); } ); my @db_names = $cv->recv; print "$_\n" for @db_names; Returns list of known database names. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Retention Policy Management create_retention_policy $cv = AE::cv; $db->create_retention_policy( name => 'last_day', database => 'mydb', duration => '1d', replication => 1, default => 0, on_success => $cv, on_error => sub { $cv->croak("Failed to create retention policy: @_"); } ); $cv->recv; Creates new retention policy named by "name" on database "database" with duration "duration" and replication factor "replication". If "default" is provided and true the created retention policy becomes the default one. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. alter_retention_policy $cv = AE::cv; $db->alter_retention_policy( name => 'last_day', database => 'mydb', duration => '1d', replication => 1, default => 0, on_success => $cv, on_error => sub { $cv->croak("Failed to alter retention policy: @_"); } ); $cv->recv; Modifies retention policy named by "name" on database "database". At least one of duration "duration", replication factor "replication" or flag "default" must be set. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_retention_policies $cv = AE::cv; $db->show_retention_policies( database => 'mydb', on_success => $cv, on_error => sub { $cv->croak("Failed to list retention policies: @_"); } ); my @retention_policies = $cv->recv; for my $rp ( @retention_policies ) { print "Name: $rp->{name}\n"; print "Duration: $rp->{duration}\n"; print "Replication factor: $rp->{replicaN}\n"; print "Default?: $rp->{default}\n"; } Returns a list of hash references with keys "name", "duration", "replicaN" and "default" for each replication policy defined on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. User Management create_user $cv = AE::cv; $db->create_user( username => 'jdoe', password => 'mypassword', all_privileges => 1, on_success => $cv, on_error => sub { $cv->croak("Failed to create user: @_"); } ); $cv->recv; Creates user with "username" and "password". If flag "all_privileges" is set to true created user will be granted cluster administration privileges. Note: "password" will be automatically enclosed in single quotes. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. set_user_password $cv = AE::cv; $db->set_user_password( username => 'jdoe', password => 'otherpassword', on_success => $cv, on_error => sub { $cv->croak("Failed to set password: @_"); } ); $cv->recv; Sets password to "password" for the user identified by "username". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_users $cv = AE::cv; $db->show_users( on_success => $cv, on_error => sub { $cv->croak("Failed to list users: @_"); } ); my @users = $cv->recv; for my $u ( @users ) { print "Name: $u->{user}\n"; print "Admin?: $u->{admin}\n"; } Returns a list of hash references with keys "user" and "admin" for each defined user. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. grant_privileges $cv = AE::cv; $db->grant_privileges( username => 'jdoe', # privileges at single database database => 'mydb', access => 'ALL', # or to grant cluster administration privileges all_privileges => 1, on_success => $cv, on_error => sub { $cv->croak("Failed to grant privileges: @_"); } ); $cv->recv; Grants to user "username" access "access" on database "database". If flag "all_privileges" is set it grants cluster administration privileges instead. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. revoke_privileges $cv = AE::cv; $db->revoke_privileges( username => 'jdoe', # privileges at single database database => 'mydb', access => 'WRITE', # or to revoke cluster administration privileges all_privileges => 1, on_success => $cv, on_error => sub { $cv->croak("Failed to revoke privileges: @_"); } ); $cv->recv; Revokes from user "username" access "access" on database "database". If flag "all_privileges" is set it revokes cluster administration privileges instead. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_user $cv = AE::cv; $db->drop_user( username => 'jdoe', on_success => $cv, on_error => sub { $cv->croak("Failed to drop user: @_"); } ); $cv->recv; Drops user "username". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Schema Exploration show_measurements $cv = AE::cv; $db->show_measurements( database => 'mydb', where => "host = 'server02'", on_success => $cv, on_error => sub { $cv->croak("Failed to list measurements: @_"); } ); my @measurements = $cv->recv; print "$_\n" for @measurements; Returns names of measurements from database "database", filtered by optional "where" clause. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_measurement $cv = AE::cv; $db->drop_measurement( database => 'mydb', measurement => 'cpu_load', on_success => $cv, on_error => sub { $cv->croak("Failed to drop measurement: @_"); } ); $cv->recv; Drops measurement "measurement". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_series $cv = AE::cv; $db->show_series( database => 'mydb', measurement => 'cpu_load', where => "host = 'server02'", on_success => $cv, on_error => sub { $cv->croak("Failed to list series: @_"); } ); my $series = $cv->recv; for my $measurement ( sort keys %{ $series } ) { print "Measurement: $measurement\n"; for my $s ( @{ $series->{$measurement} } ) { print " * $_: $s->{$_}\n" for sort keys %{ $s }; } } Returns from database "database" and optional measurement "measurement", optionally filtered by the "where" clause, an hash reference with measurements as keys and their unique tag sets as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_series $cv = AE::cv; $db->drop_series( database => 'mydb', measurement => 'cpu_load', where => "host = 'server02'", on_success => $cv, on_error => sub { $cv->croak("Failed to drop measurement: @_"); } ); $cv->recv; Drops series from measurement "measurement" filtered by "where" clause from database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_tag_keys $cv = AE::cv; $db->show_tag_keys( database => 'mydb', measurement => 'cpu_load', on_success => $cv, on_error => sub { $cv->croak("Failed to list tag keys: @_"); } ); my $tag_keys = $cv->recv; for my $measurement ( sort keys %{ $tag_keys } ) { print "Measurement: $measurement\n"; print " * $_\n" for @{ $tag_keys->{$measurement} }; } Returns from database "database" and optional measurement "measurement", optionally filtered by the "where" clause, an hash reference with measurements as keys and their unique tag keys as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_tag_values $cv = AE::cv; $db->show_tag_values( database => 'mydb', measurement => 'cpu_load', # single key key => 'host', # or a list of keys keys => [qw( host region )], where => "host = 'server02'", on_success => $cv, on_error => sub { $cv->croak("Failed to list tag values: @_"); } ); my $tag_values = $cv->recv; for my $tag_key ( sort keys %{ $tag_values } ) { print "Tag key: $tag_key\n"; print " * $_\n" for @{ $tag_values->{$tag_key} }; } Returns from database "database" and optional measurement "measurement", values from a single tag key "key" or a list of tag keys "keys", optionally filtered by the "where" clause, an hash reference with tag keys as keys and their unique tag values as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Continuous Queries create_continuous_query $cv = AE::cv; $db->create_continuous_query( database => 'mydb', name => 'per5minutes', query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)', on_success => $cv, on_error => sub { $cv->croak("Failed to create continuous query: @_"); } ); $cv->recv; Creates new continuous query named by "name" on database "database" using query "query". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_continuous_query $cv = AE::cv; $db->drop_continuous_query( database => 'mydb', name => 'per5minutes', on_success => $cv, on_error => sub { $cv->croak("Failed to drop continuous query: @_"); } ); $cv->recv; Drops continuous query named by "name" on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_continuous_queries $cv = AE::cv; $db->show_continuous_queries( database => 'mydb', on_success => $cv, on_error => sub { $cv->croak("Failed to list continuous queries: @_"); } ); my @continuous_queries = $cv->recv; for my $cq ( @continuous_queries ) { print "Name: $cq->{name}\n"; print "Query: $cq->{query}\n"; } Returns a list of hash references with keys "name" and "query" for each continuous query defined on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Writing Data write $cv = AE::cv; $db->write( database => 'mydb', precision => 'n', rp => 'last_day', data => [ # line protocol formatted 'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1437868012260500137', # or as an hash { measurement => 'cpu_load', tags => { host => 'server02', region => 'eu-east', }, fields => { value => '0.64', sensor => '"top"', }, time => time() * 10**9 } ], on_success => $cv, on_error => sub { $cv->croak("Failed to write data: @_"); } ); $cv->recv; Writes to database "database" and optional retention policy "rp", time-series data "data" with optional precision "precision". The "data" can be specified as single scalar value or as array reference. In either case the scalar variables are expected to be an formatted using line protocol or if hash with required keys "measurement" and "fields" and optional "tags" and "time". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Querying Data select $cv = AE::cv; $db->select( database => 'mydb', measurement => 'cpu_load', fields => 'host, count(value)', where => "region = 'eu-east' AND time > now() - 7d", group_by => 'time(5m), host', fill => 'previous', order_by => 'ASC', limit => 10, on_success => $cv, on_error => sub { $cv->croak("Failed to select data: @_"); } ); my $results = $cv->recv; for my $row ( @{ $results } ) { print "Measurement: $row->{name}\n"; print "Tags:\n"; print " * $_ = $row->{tags}->{$_}\n" for keys %{ $row->{tags} || {} }; print "Values:\n"; for my $value ( @{ $row->{values} || [] } ) { print " * $_ = $value->{$_}\n" for keys %{ $value || {} }; } } Executes an select query on database "database" created from provided arguments measurement "measurement", fields to select "fields", optional "where" clause, grouped by "group_by" and empty values filled with "fill", ordered by "order_by" and number of results limited to "limit". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. query $cv = AE::cv; $db->query( query => { db => 'mydb', q => 'SELECT * FROM cpu_load', }, on_response => $cv, ); my ($response_data, $response_headers) = $cv->recv; Executes an arbitrary query using provided in "query" arguments. The required "on_response" code reference is executed with the raw response data and headers as parameters. CAVEATS Following the optimistic nature of InfluxDB this modules does not validate any parameters. Also quoting and escaping special characters is to be done by the user of this library. AUTHOR Alex J. G. Burzyński COPYRIGHT AND LICENSE This software is copyright (c) 2015 by Alex J. G. Burzyński . This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.