package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
"io/ioutil"
"log"
"os/exec"
"strings"
)
func main() {
cmd := exec.Command("minikube", "ip")
stdout, err := cmd.Output()
ingressHost := strings.TrimSpace(string(stdout))
cmd = exec.Command("minikube", "ssh-key")
stdout, err = cmd.Output()
ingressKey := strings.TrimSpace(string(stdout))
// copy ca cert
cmd = exec.Command("scp", "-i", ingressKey, "docker@"+ingressHost+":/var/elasticsearch/config/certs/ca/ca.crt", "/mnt/c/Users/ktimoney/go/elastic/")
stdout, err = cmd.Output()
// get the elasticsearch service nodePort
cmd = exec.Command("kubectl", "get", "service", "elasticsearch", "-n", "logging",
"-o", "jsonpath={.spec.ports[?(@.port==9200)].nodePort}")
stdout, err = cmd.Output()
secureIngressPort := strings.TrimSpace(string(stdout))
clusterURLs := []string{"https://" + ingressHost + ":" + secureIngressPort}
username := "elastic"
password := "secret"
cert, _ := ioutil.ReadFile("./ca.crt")
// client configuration
cfg := elasticsearch.Config{
Addresses: clusterURLs,
Username: username,
Password: password,
CACert: cert,
}
ctx := context.Background()
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
log.Println(elasticsearch.Version)
resp, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer resp.Body.Close()
log.Println(resp)
// Index Query
indexResp, err := esapi.CatIndicesRequest{Format: "json", Pretty: true}.Do(ctx, es)
if err != nil {
return
}
indexBody := &indexResp.Body
defer indexResp.Body.Close()
fmt.Println(indexResp.String())
body, err := ioutil.ReadAll(*indexBody)
var results []map[string]interface{}
json.Unmarshal(body, &results)
fmt.Printf("Index: %+v\n", results)
indexName := fmt.Sprintf("%v", results[len(results)-1]["index"])
query := `{"query": {"match" : {"log": "token"}},"size": 3}`
runQuery(es, ctx, indexName, query)
query = `
{
"query": {
"bool": {
"must": [
{ "match": { "kubernetes.container_name": "istio-proxy" }},
{ "match": { "log": "token" }},
{ "match": { "tag": "rapp-jwt-invoker" }},
{ "range": { "@timestamp": { "gte": "now-60m" }}}
]
}
},"size": 1
}
`
runQuery(es, ctx, indexName, query)
query = `
{
"query": {
"bool": {
"must": [
{ "match": { "kubernetes.container_name": "istio-proxy" }},
{ "match": { "log": "GET /rapp-jwt-provider" }},
{ "match": { "kubernetes.labels.app_kubernetes_io/name": "rapp-jwt-provider" }},
{ "match_phrase": { "tag": "jwt-provider" }},
{ "range": { "@timestamp": { "gte": "now-60m" }}}
]
}
},"size": 1
}
`
runQuery(es, ctx, indexName, query)
}
func runQuery(es *elasticsearch.Client, ctx context.Context, indexName string, query string) {
// Query indexName
var mapResp map[string]interface{}
var buf bytes.Buffer
var b strings.Builder
b.WriteString(query)
read := strings.NewReader(b.String())
// Attempt to encode the JSON query and look for errors
if err := json.NewEncoder(&buf).Encode(read); err != nil {
log.Fatalf("Error encoding query: %s", err)
// Query is a valid JSON object
} else {
fmt.Println("\njson.NewEncoder encoded query:", read, "\n")
}
// Pass the JSON query to client's Search() method
searchResp, err := es.Search(
es.Search.WithContext(ctx),
es.Search.WithIndex(indexName),
es.Search.WithBody(read),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Elasticsearch Search() API ERROR:", err)
}
defer searchResp.Body.Close()
// Decode the JSON response and using a pointer
if err := json.NewDecoder(searchResp.Body).Decode(&mapResp); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Iterate the document "hits" returned by API call
for _, hit := range mapResp["hits"].(map[string]interface{})["hits"].([]interface{}) {
// Parse the attributes/fields of the document
doc := hit.(map[string]interface{})
// The "_source" data is another map interface nested inside of doc
source := doc["_source"]
// Get the document's _id and print it out along with _source data
docID := doc["_id"]
fmt.Println("docID:", docID)
fmt.Println("_source:", source, "\n")
// extract the @timestamp field
timeStamp := fmt.Sprintf("%v", source.(map[string]interface{})["@timestamp"])
fmt.Println("timeStamp:", timeStamp)
// extract the tag field
tag := fmt.Sprintf("%v", source.(map[string]interface{})["tag"])
fmt.Println("tag:", tag)
// extract the log field
k8slog := fmt.Sprintf("%v", source.(map[string]interface{})["log"])
fmt.Println("log:", k8slog)
}
hits := int(mapResp["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
fmt.Println("Matches:", hits)
} |