# NAME

POE::Component::ElasticSearch::Indexer - POE session to index data to ElasticSearch

# VERSION

version 0.015

# SYNOPSIS

This POE Session is used to index data to an ElasticSearch cluster.

    use POE qw{ Component::ElasticSearch::Indexer };

    my $es_session = POE::Component::ElasticSearch::Indexer->spawn(
        Alias            => 'es',                    # Default
        Protocol         => 'http',                  # Default
        Servers          => [qw(localhost)],         # Default
        Timeout          => 5,                       # Default
        FlushInterval    => 30,                      # Default
        FlushSize        => 1_000,                   # Default
        LoggingConfig    => undef,                   # Default
        DefaultIndex     => 'logs-%Y.%m.%d',         # Default
        DefaultType      => '_doc',                  # Default
        BatchDir         => '/tmp/es_index_backlog', # Default
        BatchDiskSpace   => undef,                   # Default
        StatsHandler     => undef,                   # Default
        StatsInterval    => 60,                      # Default
        AuthUsername     => $ENV{USER},              # Default
        AuthPassword     => undef,                   # Default
    );

    # Index the document using the queue for better performance
    $poe_kernel->post( es => queue => $json_data );

# DESCRIPTION

This module exists to provide event-based Perl programs with a simple way to
index documents into an ElasticSearch cluster.

## spawn()

This method spawns the ElasticSearch indexing [POE::Session](https://metacpan.org/pod/POE%3A%3ASession). It accepts the
following parameters.

- **Alias**

    The alias this session is available to other sessions as.  The default is
    **es**.

- **Protocol**

    Can be either `http` or `https`, defaults to **http**.

- **Servers**

    A list of Elasticsearch hosts for connections.  Maybe in the form of
    `hostname` or `hostname:port`.

- **AuthUsername**

    Username for HTTP Basic Authorization, defaults to `$ENV{USER}`.

- **AuthPassword**

    Password for HTTP Basic Authorization, set to enable HTTP Basic Authorization.

- **PoolConnections**

    Boolean, default true.  Enable connection pooling with
    [POE::Component::Client::Keepalive](https://metacpan.org/pod/POE%3A%3AComponent%3A%3AClient%3A%3AKeepalive).  This is desirable in most cases, but can
    result in timeouts piling up.  You may wish to disable this if you notice that
    indexing takes a while to recover after timeout events.

- **KeepAliveTimeout**

    Requires `PoolConnections`.

    Set the keep\_alive timeout in seconds for the creation of a
    [POE::Component::Client::Keepalive](https://metacpan.org/pod/POE%3A%3AComponent%3A%3AClient%3A%3AKeepalive) connection pool.

    Defaults to **2**.

- **MaxConnsPerServer**

    Requires `PoolConnections`.

    Maximum number of simultaneous connections to an Elasticsearch node.  Used in
    the creation of a [POE::Component::Client::Keepalive](https://metacpan.org/pod/POE%3A%3AComponent%3A%3AClient%3A%3AKeepalive) connection pool.

    Defaults to **3**.

- **MaxConnsTotal**

    Requires `PoolConnections`.

    Maximum number of simultaneous connections to all servers.  Used in
    the creation of a [POE::Component::Client::Keepalive](https://metacpan.org/pod/POE%3A%3AComponent%3A%3AClient%3A%3AKeepalive) connection pool.

    Defaults to **MaxConnsPerServer \* number of Servers**.

- **MaxPendingRequests**

    Requires `PoolConnections`.

    Maximum number of requests backlogged in the connection pool.  Defaults to **5**.

- **MaxFailedRatio**

    A number between 0 and 1 representing a percentage of bulk requests that can
    fail before we back off the cluster for the **StatsInterval**.  This is
    calculated every **StatsInterval**.  The default is **0.8** or 80%.

- **LoggingConfig**

    The [Log::Log4perl](https://metacpan.org/pod/Log%3A%3ALog4perl) configuration file for the indexer to use.  Defaults to
    writing logs to the current directory into the file `es_indexing.log`.

- **Timeout**

    Number of seconds for the HTTP transport connect and transport timeouts.
    Defaults to **5** seconds.  The total request timeout, waiting for an open
    connection slot and then completing the request, will be this multiplied by 2.

- **FlushInterval**

    Maximum number of seconds which can pass before a flush of the queue is
    attempted.  Defaults to **30** seconds.

- **FlushSize**

    Once this number of documents is reached, flush the queue regardless of time
    since the last flush.  Defaults to **1,000**.

- **DefaultIndex**

    A `strftime` aware index pattern to use if the document is missing an
    `_index` element.  Defaults to **logs-%Y.%m.%d**.

- **DefaultType**

    Use this `_type` attribute if the document is missing one.  Defaults to
    **\_doc** to be compatible with ES 7.x.

    The `_type` attribute will be stripped from documents if the cluster is
    running greater than 7.0.0.

- **BatchDir**

    If the cluster responds with an HTTP failure code, the batch is written to disk
    in this directory to be indexed when the cluster is available again.  Defaults
    to `/tmp/es_index_backlog`.

- **BatchDiskSpace**

    Defaults to undef, which means disk space isn't checked.  If set, if the batch
    size goes over this limit, every new batch saved will delete the oldest batch.
    Checked every ten batches.

    You may specify either as absolute bytes or using shortcuts:

        BatchDiskSpace => 500kb,
        BatchDiskSpace => 100mb,
        BatchDiskSpace => 10gb,
        BatchDiskSpace => 1tb,

- **MaxRecoveryBatches**

    The number of batches to process per backlog event.  This will only come into
    play if there are batches on disk to flush.  Defaults to **10**.

- **StatsHandler**

    A code reference that will be passed a hash reference containing the keys and
    values of counters tracked by this component.  Defaults to `undef`, meaning no
    code is run.

- **StatsInterval**

    Run the `StatsHandler` every `StatsInterval` seconds.  Default to **60**.

- **BacklogInterval**

    Run the backlog processing event  every `BacklogInterval` seconds.  Default to **60**.
    Will process up to `MaxRecoveryBatches` batches per `BacklogInterval`.

    This event only fires when there are batches on disk.  When it's done
    processing them, it will then stop firing.

- **CleanupInterval**

    Run the cleanup event  every `CleanupInterval` seconds.  Default to **60**.
    This will check to ensure the `BatchDiskSpace` is honored and delete the
    oldest batches if that is exceeded.

    This event only fires when there are batches on disk.

## EVENTS

The events provided by this component.

- **version**

    Runs at start to the get Elasticsearch Version.

- **queue**

    Takes an array reference of hash references to be transformed into JSON
    documents and submitted to the cluster's `_bulk` API.

    Each hash reference may pass in the following special keys, which will be used
    to index the event.  These keys will be deleted from the document being indexed
    as they have special meaning to the `bulk` API.

    - **\_id**

        Will be submitted as the document id in the bulk operation, if not specified,
        Elasticsearch will generate a UUID for each document automatically.

    - **\_type**

        Will be submitted as the document type in the bulk operation, if not specified,
        we'll use the `DefaultType` specified in the `spawn()` method.

        Deprecated and removed for clusters running Elasticsearch version 7.0 or higher.

    - **\_index**

        Will cause the document to be indexed into that index, if not specified, the
        `DefaultIndex` will be used.

    - **\_epoch**

        If the `DefaultIndex` uses a `strftime` compatible string, you may specify an
        `_epoch` in every document.  If not specified, we'll assume the epoch to use
        for `strftime` calculations is the current time.

    For more information, see the [Elasticsearch Bulk API Docs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).

    Alternatively, you can provide an array reference containing blessed objects that
    provide an `as_bulk()` method.  The result of that method will be added to the
    bulk queue.

    If you've decided to construct the requisite newline delimited JSON yourself,
    you may pass in an array reference containing scalars.  If you do, the module
    assumes you know what you're doing and will append that text to the existing
    bulk queue unchanged.

    Example use case:

        sub syslog_handle_line {
            my ($kernel,$heap,$session,$line) = @_[KERNEL,HEAP,SESSION,ARG0];

            # Create a document from syslog data
            local $Parse::Syslog::Line::PruneRaw = 1;
            local $Parse::Syslog::Line::PruneEmpty = 1;
            my $evt = parse_syslog_line($line);

            # Override the type
            $evt->{_type} = 'syslog';

            # If we want to collect this event into an auth index:
            if( exists $Authentication{$evt->{program}} ) {
                $evt->{_index} = strftime('authentication-%Y.%m.%d',
                        localtime($evt->{epoch} || time)
                );
            }
            else {
                # Set an _epoch for the es queue DefaultIndex
                $evt->{_epoch} = $evt->{epoch} ? delete $evt->{epoch} : time;
            }
            # You'll want to batch these in your processor to avoid excess
            # overhead creating so many events in the POE loop
            push @{ $heap->{batch} }, $evt;

            # Once we hit 10 messages, force the flush
            $kernel->call( $session->ID => 'submit_batch') if @{ $heap->{batch} } > 10;
        }

        sub submit_batch {
            my ($kernel,$heap) = @_[KERNEL,HEAP];

            # Reset the batch scheduler
            $kernel->delay( 'submit_batch' => 10 );

            $kernel->post( es => queue => delete $heap->{batch} );
            $heap->{batch} = [];
        }

- **flush**

    Schedule a flush of the existing bulk updates to the cluster.  It should never
    be necessary to call this event unless you'd like to shutdown the event loop
    faster.

- **backlog**

    Request the disk-based backlog be processed.  You should never need to call
    this event as the session will run it once it starts and if there's  data to
    process, it will continue rescheduling as needed.  When a bulk operation fails
    resulting in a batch file, this event is scheduled to run again.

- **shutdown**

    Inform this session that you'd like to wrap up operations.  This prevents recurring events from being scheduled.

# AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

# COPYRIGHT AND LICENSE

This software is Copyright (c) 2018 by Brad Lhotsky.

This is free software, licensed under:

    The (three-clause) BSD License

# CONTRIBUTOR

Mohammad S Anwar <mohammad.anwar@yahoo.com>

# SUPPORT

## Websites

The following websites have more information about this module, and may be of help to you. As always,
in addition to those websites please use your favorite search engine to discover more resources.

- MetaCPAN

    A modern, open-source CPAN search engine, useful to view POD in HTML format.

    [https://metacpan.org/release/POE-Component-ElasticSearch-Indexer](https://metacpan.org/release/POE-Component-ElasticSearch-Indexer)

- RT: CPAN's Bug Tracker

    The RT ( Request Tracker ) website is the default bug/issue tracking system for CPAN.

    [https://rt.cpan.org/Public/Dist/Display.html?Name=POE-Component-ElasticSearch-Indexer](https://rt.cpan.org/Public/Dist/Display.html?Name=POE-Component-ElasticSearch-Indexer)

## Source Code

This module's source code is available by visiting:
[https://github.com/reyjrar/POE-Component-ElasticSearch-Indexer](https://github.com/reyjrar/POE-Component-ElasticSearch-Indexer)