Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
titleSDK Example
package main

import (
        "bytes"
        "context"
        "encoding/json"
        "fmt"
        "github.com/elastic/go-elasticsearch/esapi"
        "github.com/elastic/go-elasticsearch/v8"
        "io/ioutil"
        "log"
        "os/exec"
        "strings"
)

func main() {
        cmd := exec.Command("minikube", "ip")
        stdout, err := cmd.Output()
        ingressHost := strings.TrimSpace(string(stdout))

        cmd = exec.Command("minikube", "ssh-key")
        stdout, err = cmd.Output()
        ingressKey := strings.TrimSpace(string(stdout))

        // copy ca cert
        cmd = exec.Command("scp", "-i", ingressKey, "docker@"+ingressHost+":/var/elasticsearch/config/certs/ca/ca.crt", "/mnt/c/Users/ktimoney/go/elastic/")
        stdout, err = cmd.Output()

        // get the elasticsearch service nodePort
        cmd = exec.Command("kubectl", "get", "service", "elasticsearch", "-n", "logging",
                "-o", "jsonpath={.spec.ports[?(@.port==9200)].nodePort}")
        stdout, err = cmd.Output()
        secureIngressPort := strings.TrimSpace(string(stdout))

        clusterURLs := []string{"https://" + ingressHost + ":" + secureIngressPort}
        username := "elastic"
        password := "secret"
        cert, _ := ioutil.ReadFile("./ca.crt")

        // client configuration
        cfg := elasticsearch.Config{
                Addresses: clusterURLs,
                Username:  username,
                Password:  password,
                CACert:    cert,
        }
        ctx := context.Background()

        es, err := elasticsearch.NewClient(cfg)
        if err != nil {
                log.Fatalf("Error creating the client: %s", err)
        }
        log.Println(elasticsearch.Version)

        resp, err := es.Info()
        if err != nil {
                log.Fatalf("Error getting response: %s", err)
        }
        defer resp.Body.Close()
        log.Println(resp)

        // Index Query
        indexResp, err := esapi.CatIndicesRequest{Format: "json", Pretty: true}.Do(ctx, es)
        if err != nil {
                return
        }
        indexBody := &indexResp.Body
        defer indexResp.Body.Close()

        fmt.Println(indexResp.String())

        body, err := ioutil.ReadAll(*indexBody)

        var results []map[string]interface{}
        json.Unmarshal(body, &results)
        fmt.Printf("Index: %+v\n", results)
        indexName := fmt.Sprintf("%v", results[len(results)-1]["index"])
        query := `{"query": {"match" : {"log": "token"}},"size": 3}`
        runQuery(es, ctx, indexName, query)
        query = `
                {
                 "query": {
                  "bool": {
                    "must": [
                      { "match": { "kubernetes.container_name": "istio-proxy" }},
                      { "match": { "log": "token" }},
                      { "match": { "tagkubernetes.labels.app_kubernetes_io/name": "rapp-jwt-invoker" }},
                      { "range": { "@timestamp": { "gte": "now-60m" }}}
                     ]
                   }
                 },"size": 1
                }
        `
        runQuery(es, ctx, indexName, query)
        query = `
                {
                 "query": {
                  "bool": {
                    "must": [
                      { "match": { "kubernetes.container_name": "istio-proxy" }},
                      { "match": { "log": "GET /rapp-jwt-provider" }},
                      { "match": { "kubernetes.labels.app_kubernetes_io/name": "rapp-jwt-provider" }},
                      { "match_phrase": { "tag": "jwt-provider" }},
                      { "range": { "@timestamp": { "gte": "now-60m" }}}
                     ]
                   }
                 },"size": 1
                }
        `
        runQuery(es, ctx, indexName, query)

}

func runQuery(es *elasticsearch.Client, ctx context.Context, indexName string, query string) {
        // Query indexName
        var mapResp map[string]interface{}
        var buf bytes.Buffer
        var b strings.Builder
        b.WriteString(query)
        read := strings.NewReader(b.String())

        // Attempt to encode the JSON query and look for errors
        if err := json.NewEncoder(&buf).Encode(read); err != nil {
                log.Fatalf("Error encoding query: %s", err)

                // Query is a valid JSON object
        } else {
                fmt.Println("\njson.NewEncoder encoded query:", read, "\n")
        }

        // Pass the JSON query to client's Search() method
        searchResp, err := es.Search(
                es.Search.WithContext(ctx),
                es.Search.WithIndex(indexName),
                es.Search.WithBody(read),
                es.Search.WithTrackTotalHits(true),
                es.Search.WithPretty(),
        )

        if err != nil {
                log.Fatalf("Elasticsearch Search() API ERROR:", err)
        }
        defer searchResp.Body.Close()

        // Decode the JSON response and using a pointer
        if err := json.NewDecoder(searchResp.Body).Decode(&mapResp); err != nil {
                log.Fatalf("Error parsing the response body: %s", err)
        }

        // Iterate the document "hits" returned by API call
        for _, hit := range mapResp["hits"].(map[string]interface{})["hits"].([]interface{}) {

                // Parse the attributes/fields of the document
                doc := hit.(map[string]interface{})

                // The "_source" data is another map interface nested inside of doc
                source := doc["_source"]

                // Get the document's _id and print it out along with _source data
                docID := doc["_id"]
                fmt.Println("docID:", docID)
                fmt.Println("_source:", source, "\n")
                // extract the @timestamp field
                timeStamp := fmt.Sprintf("%v", source.(map[string]interface{})["@timestamp"])
                fmt.Println("timeStamp:", timeStamp)
                // extract the tag field
                tag := fmt.Sprintf("%v", source.(map[string]interface{})["tag"])
                fmt.Println("tag:", tag)
                // extract the log field
                k8slog := fmt.Sprintf("%v", source.(map[string]interface{})["log"])
                fmt.Println("log:", k8slog)
        }
        hits := int(mapResp["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
        fmt.Println("Matches:", hits)

}

...