Subscribe to Stuck in an Infiniteloop        RSS Feed
-----

Ingesting Prometheus Remote Write Data Into Your Apache Camel Flow

Icon Leave Comment
Building on the last two posts, we'll now ingest this Prometheus data sampling into Apache Camel.

To recap from the previous entries, we need to scoop up remote, types, and gogoproto/gogo .proto files and compile java code from them for use in this project.

We'll be using Camel 3.0.0. We'll use camel-jetty to simplify our server setup. We'll also need camel-protobuf for data unmarshaling.

  <dependencyManagement>
    <dependencies>
      <!-- Camel BOM -->
      <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-parent</artifactId>
        <version>3.0.0</version>
        <scope>import</scope>
        <type>pom</type>
      </dependency>
    </dependencies>
  </dependencyManagement>
...
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jetty</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-protobuf</artifactId>
    </dependency>



The main interesting tidbit here is that we could do all the processing in our own custom Processor, but camel-protobuf has an unmarshal() based on a data format we give it. So we'll ingest from the same endpoint in jetty (now done with a single line with the from() processor and camel-jetty), snappy uncompress, unmarshall, convert to json, and write it to disk for viewing.

package org.camel.prometheus.example;

import com.google.protobuf.util.JsonFormat;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.converter.stream.InputStreamCache;
import org.apache.camel.dataformat.protobuf.ProtobufDataFormat;
import org.xerial.snappy.Snappy;
import prometheus.Remote;

import java.io.ByteArrayOutputStream;

public class MyRouteBuilder extends RouteBuilder {

    private static JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();

    public void configure() {
        ProtobufDataFormat format = new ProtobufDataFormat(Remote.WriteRequest.getDefaultInstance());

        from("jetty:http://localhost:8000/receive")
                .log("Received message on /receive endpoint")
                .process(exchange -> {
                    InputStreamCache cache = (InputStreamCache)exchange.getIn().getBody();
                    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                    int nRead;
                    byte[] data = new byte[1024];
                    while ((nRead = cache.read(data, 0, data.length)) != -1) {
                        buffer.write(data, 0, nRead);
                    }
                    buffer.flush();
                    exchange.getMessage().setBody(Snappy.uncompress(buffer.toByteArray()));
                })
                .unmarshal(format)
                .process(exchange -> {
                    String json = JSON_PRINTER.print((Remote.WriteRequest)exchange.getIn().getBody());
                    exchange.getMessage().setBody(json);
                })
        .to("file://target/data_samples");
    }
}



Code repo can be found here.

Happy coding!

0 Comments On This Entry

 

February 2020

S M T W T F S
      1
2345678
9101112131415
16 17 1819202122
23242526272829

Tags

    Recent Entries

    Recent Comments

    Search My Blog

    2 user(s) viewing

    2 Guests
    0 member(s)
    0 anonymous member(s)