1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

// Re-export counter types from prometheus crate
pub use libra_metrics_core::{
    register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
    register_int_gauge, register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec,
    IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};

use libra_logger::{error, info};
use libra_metrics_core::{Encoder, TextEncoder};
use std::{env, sync::mpsc, thread, thread::JoinHandle, time::Duration};

const DEFAULT_PUSH_FREQUENCY_SECS: u64 = 15;

/// MetricsPusher provides a function to push a list of Metrics to a configurable
/// pushgateway endpoint.
#[must_use = "Assign the contructed pusher to a variable, \
              otherwise the worker thread is joined immediately."]
pub struct MetricsPusher {
    worker_thread: Option<JoinHandle<()>>,
    quit_sender: mpsc::Sender<()>,
}

impl MetricsPusher {
    fn push(push_metrics_endpoint: &str) {
        let mut buffer = Vec::new();

        if let Err(e) = TextEncoder::new().encode(&libra_metrics_core::gather(), &mut buffer) {
            error!("Failed to encode push metrics: {}.", e.to_string());
        } else {
            let response = ureq::post(&push_metrics_endpoint)
                .timeout_connect(10_000)
                .send_bytes(&buffer);
            if let Some(error) = response.synthetic_error() {
                error!(
                    "Failed to push metrics to {}. Error: {}",
                    push_metrics_endpoint, error
                );
            }
        }
    }

    fn worker(
        quit_receiver: mpsc::Receiver<()>,
        push_metrics_endpoint: String,
        push_metrics_frequency_secs: u64,
    ) {
        while quit_receiver
            .recv_timeout(Duration::from_secs(push_metrics_frequency_secs))
            .is_err()
        {
            // Timeout, no quit signal received.
            Self::push(&push_metrics_endpoint);
        }
        // final push
        Self::push(&push_metrics_endpoint);
    }

    fn start_worker_thread(quit_receiver: mpsc::Receiver<()>) -> Option<JoinHandle<()>> {
        // eg value for PUSH_METRICS_ENDPOINT: "http://pushgateway.server.com:9091/metrics/job/safety_rules"
        let push_metrics_endpoint = match env::var("PUSH_METRICS_ENDPOINT") {
            Ok(s) => s,
            Err(_) => {
                info!("PUSH_METRICS_ENDPOINT env var is not set. Skipping sending metrics.");
                return None;
            }
        };
        let push_metrics_frequency_secs = match env::var("PUSH_METRICS_FREQUENCY_SECS") {
            Ok(s) => match s.parse::<u64>() {
                Ok(i) => i,
                Err(_) => {
                    error!("Invalid value for PUSH_METRICS_FREQUENCY_SECS: {}", s);
                    return None;
                }
            },
            Err(_) => DEFAULT_PUSH_FREQUENCY_SECS,
        };
        info!(
            "Starting push metrics loop. Sending metrics to {} with a frequency of {} seconds",
            push_metrics_endpoint, push_metrics_frequency_secs
        );
        Some(thread::spawn(move || {
            Self::worker(
                quit_receiver,
                push_metrics_endpoint,
                push_metrics_frequency_secs,
            )
        }))
    }

    /// start starts a new thread and periodically pushes the metrics to a pushgateway endpoint
    pub fn start() -> Self {
        let (tx, rx) = mpsc::channel();
        let worker_thread = Self::start_worker_thread(rx);

        Self {
            worker_thread,
            quit_sender: tx,
        }
    }

    pub fn join(&mut self) {
        if let Some(worker_thread) = self.worker_thread.take() {
            if let Err(e) = self.quit_sender.send(()) {
                error!(
                    "Failed to send quit signal to metric pushing worker thread: {:?}",
                    e
                );
            }
            if let Err(e) = worker_thread.join() {
                error!("Failed to join metric pushing worker thread: {:?}", e);
            }
        }
    }
}

impl Drop for MetricsPusher {
    fn drop(&mut self) {
        self.join()
    }
}