1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use libra_logger::json_log::JsonLogEntry;
use reqwest::Client;
use serde::Deserialize;
use serde_json::json;

pub mod counters;
pub mod trace;

pub mod prelude {
    pub use crate::{
        end_trace, node_sampling_data, send_logs, trace_code_block, trace_edge, trace_event,
    };
}

pub use trace::{is_selected, libra_trace_set, set_libra_trace};

const TRACE_EVENT: &str = "trace_event";
const TRACE_EDGE: &str = "trace_edge";
const PAGING_SIZE: usize = 10000;

#[derive(Deserialize)]
struct Response {
    #[serde(rename = "_scroll_id")]
    scroll_id: String,
    hits: Hits,
}

#[derive(Deserialize)]
struct Hits {
    hits: Vec<Hit>,
}

#[derive(Deserialize)]
struct Hit {
    #[serde(rename = "_source")]
    source: Source,
}

#[derive(Deserialize)]
struct Source {
    data: serde_json::Value,
    #[serde(rename = "kubernetes.pod_name")]
    pod_name: String,
}

pub struct LibraTraceClient {
    client: Client,
    addr: String,
}

impl LibraTraceClient {
    /// Create LibraTraceClient from a valid socket address.
    pub fn new<A: AsRef<str>>(address: A, port: u16) -> Self {
        let client = Client::new();
        let addr = format!("http://{}:{}", address.as_ref(), port);

        Self { client, addr }
    }

    pub async fn get_libra_trace(
        &self,
        start_time: DateTime<Utc>,
        duration: Duration,
    ) -> Result<Vec<(String, JsonLogEntry)>> {
        let start = start_time.format("%FT%T%.3fZ").to_string();
        let end = (start_time + duration).format("%FT%T%.3fZ").to_string();
        let response = self
            .client
            .get(&format!("{}/_search?scroll=1m", self.addr))
            .json(&json!(
            {
                "size": PAGING_SIZE,
                "query": {
                    "bool": {
                        "must": [
                            { "term": { "name":   "libra_trace"}}
                        ],
                        "filter": [
                            { "range": { "@timestamp": { "gte": start, "lte": end }}}
                        ]
                    }
                }
            }
            ))
            .send()
            .await?;

        let response: Response = response.json().await?;

        let mut id = response.scroll_id.clone();
        let mut v: Vec<(String, JsonLogEntry)> = Vec::new();
        parse_response(&mut v, response)?;

        loop {
            let response = self
                .client
                .get(&format!("{}/_search/scroll", self.addr))
                .json(&json!(
                {
                    "scroll" : "1m",
                    "scroll_id" : id
                }
                ))
                .send()
                .await?;
            let response: Response = response.json().await?;
            id = response.scroll_id.clone();
            let hits = response.hits.hits.len();
            parse_response(&mut v, response)?;
            if hits < PAGING_SIZE {
                break;
            }
        }
        Ok(v)
    }
}

fn parse_response(v: &mut Vec<(String, JsonLogEntry)>, response: Response) -> Result<()> {
    for item in response.hits.hits {
        let peer = item.source.pod_name;
        let item = item.source.data;
        if let Some(trace_event) = item.get(TRACE_EVENT) {
            v.push((peer, serde_json::from_value(trace_event.clone())?));
        } else if let Some(trace_edge) = item.get(TRACE_EDGE) {
            v.push((peer, serde_json::from_value(trace_edge.clone())?));
        }
    }
    Ok(())
}