Subscribe to Stuck in an Infiniteloop        RSS Feed
-----

Ingesting Prometheus Remote Write Data Into Your Apache Camel Flow

Icon Leave Comment
Building on the last two posts, we'll now ingest this Prometheus data sampling into Apache Camel.

To recap from the previous entries, we need to scoop up remote, types, and gogoproto/gogo .proto files and compile java code from them for use in this project.

We'll be using Camel 3.0.0. We'll use camel-jetty to simplify our server setup. We'll also need camel-protobuf for data unmarshaling.

  <dependencyManagement>
    <dependencies>
      <!-- Camel BOM -->
      <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-parent</artifactId>
        <version>3.0.0</version>
        <scope>import</scope>
        <type>pom</type>
      </dependency>
    </dependencies>
  </dependencyManagement>
...
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jetty</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-protobuf</artifactId>
    </dependency>



The main interesting tidbit here is that we could do all the processing in our own custom Processor, but camel-protobuf has an unmarshal() based on a data format we give it. So we'll ingest from the same endpoint in jetty (now done with a single line with the from() processor and camel-jetty), snappy uncompress, unmarshall, convert to json, and write it to disk for viewing.

package org.camel.prometheus.example;

import com.google.protobuf.util.JsonFormat;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.converter.stream.InputStreamCache;
import org.apache.camel.dataformat.protobuf.ProtobufDataFormat;
import org.xerial.snappy.Snappy;
import prometheus.Remote;

import java.io.ByteArrayOutputStream;

public class MyRouteBuilder extends RouteBuilder {

    private static JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();

    public void configure() {
        ProtobufDataFormat format = new ProtobufDataFormat(Remote.WriteRequest.getDefaultInstance());

        from("jetty:http://localhost:8000/receive")
                .log("Received message on /receive endpoint")
                .process(exchange -> {
                    InputStreamCache cache = (InputStreamCache)exchange.getIn().getBody();
                    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                    int nRead;
                    byte[] data = new byte[1024];
                    while ((nRead = cache.read(data, 0, data.length)) != -1) {
                        buffer.write(data, 0, nRead);
                    }
                    buffer.flush();
                    exchange.getMessage().setBody(Snappy.uncompress(buffer.toByteArray()));
                })
                .unmarshal(format)
                .process(exchange -> {
                    String json = JSON_PRINTER.print((Remote.WriteRequest)exchange.getIn().getBody());
                    exchange.getMessage().setBody(json);
                })
        .to("file://target/data_samples");
    }
}



Code repo can be found here.

Happy coding!

0 Comments On This Entry

 

March 2020

S M T W T F S
1234567
891011121314
15161718192021
22232425262728
29 3031    

Tags

    Recent Entries

    Recent Comments

    Search My Blog

    1 user(s) viewing

    1 Guests
    0 member(s)
    0 anonymous member(s)