jmx exporter ソースコード解析

Jmx_exporterとは?

JMXのMBeansから、Prometheus用の形式に変換し、メトリクスを出力することができるExporterです。

どうやって使うの?

普通のアプリケーションはもちろん、KafkaやHDFSなどのオープンソースでもJMX用のポートを開ける物であれば簡単にアプリケーションの各種パラメーターを指定したポートに出力し、prometheusを使ってモニタリングシステムを組む事ができます

使い方は公式githubにて詳しく説明があるので割愛します。

github.com

ソースコード解析

凄くシンプルなプログラムなので、入り口から順に追って読んでみましょう

アプリケーション起動

入り口はここです: jmx_exporter/jmx_prometheus_javaagent/src/main/java/io/prometheus/jmx/JavaAgent.java#agentmain

    public static void agentmain(String agentArgument, Instrumentation instrumentation) throws Exception {
        premain(agentArgument, instrumentation);
    }

    public static void premain(String agentArgument, Instrumentation instrumentation) throws Exception {
        // Bind to all interfaces by default (this includes IPv6).
        String host = "0.0.0.0";

        try {
            Config config = parseConfig(agentArgument, host);

            new BuildInfoCollector().register();
            new JmxCollector(new File(config.file)).register();
            DefaultExports.initialize();
            server = new HTTPServer(config.socket, CollectorRegistry.defaultRegistry, true);
        }
        catch (IllegalArgumentException e) {
            System.err.println("Usage: -javaagent:/path/to/JavaAgent.jar=[host:]<port>:<yaml configuration file> " + e.getMessage());
            System.exit(1);
        }
    }
    public static Config parseConfig(String args, String ifc) {
        Pattern pattern = Pattern.compile(
                "^(?:((?:[\\w.]+)|(?:\\[.+])):)?" +  // host name, or ipv4, or ipv6 address in brackets
                        "(\\d{1,5}):" +              // port
                        "(.+)");                     // config file

        Matcher matcher = pattern.matcher(args);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Malformed arguments - " + args);
        }

        String givenHost = matcher.group(1);
        String givenPort = matcher.group(2);
        String givenConfigFile = matcher.group(3);

        int port = Integer.parseInt(givenPort);

        InetSocketAddress socket;
        if (givenHost != null && !givenHost.isEmpty()) {
            socket = new InetSocketAddress(givenHost, port);
        }
        else {
            socket = new InetSocketAddress(ifc, port);
            givenHost = ifc;
        }

        return new Config(givenHost, port, givenConfigFile, socket);
    }

    static class Config {
        String host;
        int port;
        String file;
        InetSocketAddress socket;

        Config(String host, int port, String file, InetSocketAddress socket) {
            this.host = host;
            this.port = port;
            this.file = file;
            this.socket = socket;
        }
    }
}

起動コマンドからArgumentをparseConfigでパースし、ホストやポート、configファイルなどを割り出しアプリケーションを起動しています

BuildInfoCollector

            new BuildInfoCollector().register();
            new JmxCollector(new File(config.file)).register();

アプリケーションの起動時、collectorsを作り、初期化しています。

BuildInfoCollectorは起動時にその名の通りビルド時のバージョンなどを記録し、MetricFamilySamplesの中に保存しています。

public class BuildInfoCollector extends Collector {
  public List<Collector.MetricFamilySamples> collect() {
    List<Collector.MetricFamilySamples> mfs = new ArrayList<Collector.MetricFamilySamples>();

    GaugeMetricFamily artifactInfo = new GaugeMetricFamily(
            "jmx_exporter_build_info",
            "A metric with a constant '1' value labeled with the version of the JMX exporter.",
            asList("version", "name"));

    Package pkg = this.getClass().getPackage();
    String version = pkg.getImplementationVersion();
    String name = pkg.getImplementationTitle();

    artifactInfo.addMetric(asList(
            version != null ? version : "unknown",
            name != null ? name : "unknown"
    ), 1L);
    mfs.add(artifactInfo);

    return mfs;
  }
}

JmxCollector

起動時に登録したもう一つのコレクターがJmxCollectorです。 こちらがメインにjmx_exporter.yamlに記述されたjmxを記録するクラスになります。

    public JmxCollector(File in) throws IOException, MalformedObjectNameException {
        configFile = in;
        config = loadConfig((Map<String, Object>)new Yaml().load(new FileReader(in)));
        config.lastUpdate = configFile.lastModified();
    }

起動時に指定されたYamlファイルをloadします

    public List<MetricFamilySamples> collect() {
     // Config ファイルの最終編集時間を確認
      if (configFile != null) {
        long mtime = configFile.lastModified();
     // アップデートされていたら、リロードする
        if (mtime > config.lastUpdate) {
          LOGGER.fine("Configuration file changed, reloading...");
          reloadConfig();
        }
      }

      Receiver receiver = new Receiver();
      JmxScraper scraper = new JmxScraper(config.jmxUrl, config.username, config.password, config.ssl,
              config.whitelistObjectNames, config.blacklistObjectNames, receiver, jmxMBeanPropertyCache);
      long start = System.nanoTime();
      double error = 0;
    // startDelaySecondsはコンフィグファイルで指定。アプリケーションの起動をまつ
      if ((config.startDelaySeconds > 0) &&
        ((start - createTimeNanoSecs) / 1000000000L < config.startDelaySeconds)) {
        throw new IllegalStateException("JMXCollector waiting for startDelaySeconds");
      }
      try {
     // メトリクスの収集を始める
        scraper.doScrape();
      } catch (Exception e) {
    ........
    }
  • configファイルが編集されているかチェックをする。
    • アップデートされていたら、リロードする
    • なのでjmx_exporterはホットロードを対応している。更新の際に再起動は必要なし
  • startDelaySecondsはコンフィグファイルで指定。アプリケーションが立ち上がるのを待たせる事ができます。デフォルトは0
  • scraper.doScrape() でmetricsのスクレイプを始めます。

doScrape

    public void doScrape() throws Exception {
        MBeanServerConnection beanConn;
        JMXConnector jmxc = null;
        // 与えられたホストとポートを元にコネクションを張ってみる。
  // SSLがある場合は認証を行う
        if (jmxUrl.isEmpty()) {
          beanConn = ManagementFactory.getPlatformMBeanServer();
        } else {
          Map<String, Object> environment = new HashMap<String, Object>();
          if (username != null && username.length() != 0 && password != null && password.length() != 0) {
            String[] credent = new String[] {username, password};
            environment.put(javax.management.remote.JMXConnector.CREDENTIALS, credent);
          }
          if (ssl) {
              environment.put(Context.SECURITY_PROTOCOL, "ssl");
              SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory();
              environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory);
              environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory);
          }

          jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
          beanConn = jmxc.getMBeanServerConnection();
        }
        try {
   // configに記載してあるホワイトリストのmbeansをハッシュセットに入れる
            // Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames()
            Set<ObjectName> mBeanNames = new HashSet<ObjectName>();
            for (ObjectName name : whitelistObjectNames) {
                for (ObjectInstance instance : beanConn.queryMBeans(name, null)) {
                    mBeanNames.add(instance.getObjectName());
                }
            }
   // configに記載してあるブラックリストのmbeansをハッシュセットから除く
            for (ObjectName name : blacklistObjectNames) {
                for (ObjectInstance instance : beanConn.queryMBeans(name, null)) {
                    mBeanNames.remove(instance.getObjectName());
                }
            }

            // Now that we have *only* the whitelisted mBeans, remove any old ones from the cache:
            jmxMBeanPropertyCache.onlyKeepMBeans(mBeanNames);

            for (ObjectName objectName : mBeanNames) {
                long start = System.nanoTime();
     // 残った有効なmbeansを取りに行く
                scrapeBean(beanConn, objectName);
                logger.fine("TIME: " + (System.nanoTime() - start) + " ns for " + objectName.toString());
            }
        } finally {
          if (jmxc != null) {
            jmxc.close();
          }
        }
    }

ここでやっている事は: - 与えられたホストとポートを元にコネクションを張ってみる   - SSLがある場合は認証を行う - ホワイトリストブラックリストを元に、ユーザーに指定されたmbeansのリストを作る - リストを元に、mbeansを取りに行く

 private void scrapeBean(MBeanServerConnection beanConn, ObjectName mbeanName) {
        MBeanInfo info;
        try {
          // コネクションからbeansの情報を取得
          info = beanConn.getMBeanInfo(mbeanName);
        } catch (IOException e) {
          logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e);
          return;
        } catch (JMException e) {
          logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e);
          return;
        }
        MBeanAttributeInfo[] attrInfos = info.getAttributes();

        Map<String, MBeanAttributeInfo> name2AttrInfo = new LinkedHashMap<String, MBeanAttributeInfo>();
        for (int idx = 0; idx < attrInfos.length; ++idx) {
            MBeanAttributeInfo attr = attrInfos[idx];
            if (!attr.isReadable()) {
                logScrape(mbeanName, attr, "not readable");
                continue;
            }
    // 有効な値をLinkedHashMapに入れていく
            name2AttrInfo.put(attr.getName(), attr);
        }
        final AttributeList attributes;
        try {
            attributes = beanConn.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0]));
        } catch (Exception e) {
            logScrape(mbeanName, name2AttrInfo.keySet(), "Fail: " + e);
            return;
        }
   // ここでは、processBeanValueを使い、recursiveにデータを処理していく
        for (Attribute attribute : attributes.asList()) {
            MBeanAttributeInfo attr = name2AttrInfo.get(attribute.getName());
            logScrape(mbeanName, attr, "process");
            processBeanValue(
                    mbeanName.getDomain(),
                    jmxMBeanPropertyCache.getKeyPropertyList(mbeanName),
                    new LinkedList<String>(),
                    attr.getName(),
                    attr.getType(),
                    attr.getDescription(),
                    attribute.getValue()
            );
        }
    }
  • 主にやっている事は、有効なbeansをリストにprocessBeanValueを使い処理していく事です

processBeanValue

   private void processBeanValue(
            String domain,
            LinkedHashMap<String, String> beanProperties,
            LinkedList<String> attrKeys,
            String attrName,
            String attrType,
            String attrDescription,
            Object value) {
        if (value == null) {
            logScrape(domain + beanProperties + attrName, "null");
   // 基本型だったら記録
        } else if (value instanceof Number || value instanceof String || value instanceof Boolean) {
            logScrape(domain + beanProperties + attrName, value.toString());
            this.receiver.recordBean(
                    domain,
                    beanProperties,
                    attrKeys,
                    attrName,
                    attrType,
                    attrDescription,
                    value);
  // そうでなかったらリカーシブルに基本型になるまでサーチしてく
        } else if (value instanceof CompositeData) {
            logScrape(domain + beanProperties + attrName, "compositedata");
            CompositeData composite = (CompositeData) value;
            CompositeType type = composite.getCompositeType();
            attrKeys = new LinkedList<String>(attrKeys);
            attrKeys.add(attrName);
            for(String key : type.keySet()) {
                String typ = type.getType(key).getTypeName();
                Object valu = composite.get(key);
                processBeanValue(
                        domain,
                        beanProperties,
                        attrKeys,
                        key,
                        typ,
                        type.getDescription(),
                        valu);
            }
  • 前の処理の続きで、processBeanValueに入ったbeans
  • 基本型(数字、文字列、ブーリアン)だったら記録
  • 他の型だったらrecursiveで基本型になるまで掘る
public void recordBean(
          String domain,
          LinkedHashMap<String, String> beanProperties,
          LinkedList<String> attrKeys,
          String attrName,
          String attrType,
          String attrDescription,
          Object beanValue) {

        String beanName = domain + angleBrackets(beanProperties.toString()) + angleBrackets(attrKeys.toString());
        // attrDescription tends not to be useful, so give the fully qualified name too.
        String help = attrDescription + " (" + beanName + attrName + ")";
        String attrNameSnakeCase = toSnakeAndLowerCase(attrName);
       // Configの中で書いたruleに基づき、マッチしたものだけ残す
        for (Rule rule : config.rules) {
          Matcher matcher = null;
          String matchName = beanName + (rule.attrNameSnakeCase ? attrNameSnakeCase : attrName);
          if (rule.pattern != null) {
            matcher = rule.pattern.matcher(matchName + ": " + beanValue);
            if (!matcher.matches()) {
              continue;
            }
          }

          Number value;
          if (rule.value != null && !rule.value.isEmpty()) {
            String val = matcher.replaceAll(rule.value);

            try {
              beanValue = Double.valueOf(val);
            } catch (NumberFormatException e) {
              LOGGER.fine("Unable to parse configured value '" + val + "' to number for bean: " + beanName + attrName + ": " + beanValue);
              return;
            }
          }
          if (beanValue instanceof Number) {
            value = ((Number)beanValue).doubleValue() * rule.valueFactor;
          } else if (beanValue instanceof Boolean) {
            value = (Boolean)beanValue ? 1 : 0;
          } else {
            LOGGER.fine("Ignoring unsupported bean: " + beanName + attrName + ": " + beanValue);
            return;
          }

          // If there's no name provided, use default export format.
          if (rule.name == null) {
            defaultExport(domain, beanProperties, attrKeys, rule.attrNameSnakeCase ? attrNameSnakeCase : attrName, help, value, rule.type);
            return;
          }

          // Matcher is set below here due to validation in the constructor.
          String name = safeName(matcher.replaceAll(rule.name));
          if (name.isEmpty()) {
            return;
          }
          if (config.lowercaseOutputName) {
            name = name.toLowerCase();
          }

          // Set the help.
          if (rule.help != null) {
            help = matcher.replaceAll(rule.help);
          }

          // Set the labels.
          ArrayList<String> labelNames = new ArrayList<String>();
          ArrayList<String> labelValues = new ArrayList<String>();
          if (rule.labelNames != null) {
            for (int i = 0; i < rule.labelNames.size(); i++) {
              final String unsafeLabelName = rule.labelNames.get(i);
              final String labelValReplacement = rule.labelValues.get(i);
              try {
                String labelName = safeName(matcher.replaceAll(unsafeLabelName));
                String labelValue = matcher.replaceAll(labelValReplacement);
                if (config.lowercaseOutputLabelNames) {
                  labelName = labelName.toLowerCase();
                }
                if (!labelName.isEmpty() && !labelValue.isEmpty()) {
                  labelNames.add(labelName);
                  labelValues.add(labelValue);
                }
              } catch (Exception e) {
                throw new RuntimeException(
                  format("Matcher '%s' unable to use: '%s' value: '%s'", matcher, unsafeLabelName, labelValReplacement), e);
              }
            }
          }

          // Add to samples.
          LOGGER.fine("add metric sample: " + name + " " + labelNames + " " + labelValues + " " + value.doubleValue());
          addSample(new MetricFamilySamples.Sample(name, labelNames, labelValues, value.doubleValue()), rule.type, help);
          return;
        }
      }

    }
  • Configで記載したruleに基づき、パターンマッチしたbeansのみ残す
  • Labelなど指定されたものを追加していく

最初のJmxCollector classの続きをみてみましょう

 List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>();
      mfsList.addAll(receiver.metricFamilySamplesMap.values());
      List<MetricFamilySamples.Sample> samples = new ArrayList<MetricFamilySamples.Sample>();
      samples.add(new MetricFamilySamples.Sample(
          "jmx_scrape_duration_seconds", new ArrayList<String>(), new ArrayList<String>(), (System.nanoTime() - start) / 1.0E9));
      mfsList.add(new MetricFamilySamples("jmx_scrape_duration_seconds", Type.GAUGE, "Time this JMX scrape took, in seconds.", samples));

      samples = new ArrayList<MetricFamilySamples.Sample>();
      samples.add(new MetricFamilySamples.Sample(
          "jmx_scrape_error", new ArrayList<String>(), new ArrayList<String>(), error));
      mfsList.add(new MetricFamilySamples("jmx_scrape_error", Type.GAUGE, "Non-zero if this scrape failed.", samples));
      return mfsList;
    }
  • jmx_exporterのメトリクス、jmx_scrape_errorとjmx_scrape_duration_secondsを追加
  • 実は今回のソースコード解析、この二つのデフォルトメトリクスをrenameできないか調べるのが目的でして、できなさそうですね涙

Registor

ここまででメトリクスの収集が完了しました 収集した物をコレクターと呼ぶ ここからはレジストリのコードです *) この部分はdependencyのなかにあります。jmx_exporterの中にはありません。

 // io.prometheus.client.CollectorRegistry#register()
    public void register(Collector m) {
        List<String> names = this.collectorNames(m);
        synchronized(this.collectorsToNames) {
            Iterator var4 = names.iterator();

            String name;
            while(var4.hasNext()) {
                name = (String)var4.next();
                // 既にレジストされているコレクターだったらエラー
                if (this.namesToCollectors.containsKey(name)) {
                    throw new IllegalArgumentException("Collector already registered that provides name: " + name);
                }
            }

            var4 = names.iterator();

            while(var4.hasNext()) {
                name = (String)var4.next();
                // namesToCollectorsに格納
                this.namesToCollectors.put(name, m);
            }
   // collectorsToNamesに格納
            this.collectorsToNames.put(m, names);
        }
    }
  • コレクターに重複がないかチェックしたら二つのHashmapに格納する
  • namesToCollectorsとcollectorsToNamesはその名の通りコレクターと名前のマップ
  • names と collectors は一対一ではない

サーバーの起動

// io.prometheus.jmx.JavaAgent.agentmain
....
            new BuildInfoCollector().register();
            new JmxCollector(new File(config.file)).register();
            DefaultExports.initialize();
            server = new HTTPServer(config.socket, CollectorRegistry.defaultRegistry, true);

アプリケーション起動の最後の一歩

    public HTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException {
        this.server = HttpServer.create();
        this.server.bind(addr, 3);
        HttpHandler mHandler = new HTTPServer.HTTPMetricHandler(registry);
        this.server.createContext("/", mHandler);
        this.server.createContext("/metrics", mHandler);
        this.executorService = Executors.newFixedThreadPool(5, HTTPServer.DaemonThreadFactory.defaultThreadFactory(daemon));
        this.server.setExecutor(this.executorService);
        this.start(daemon);
    }
....
  • httpサーバーデーモンを起動しています
  • argumentにあるCollectorRegistryが先程の手順で起動したcollectorregistryです
  • HttpHandler mHandler = new HTTPServer.HTTPMetricHandler(registry)を通してregistryにアクセスします

HTTPMetricHandler

    static class HTTPMetricHandler implements HttpHandler {
        private CollectorRegistry registry;
        private final HTTPServer.LocalByteArray response = new HTTPServer.LocalByteArray();

        HTTPMetricHandler(CollectorRegistry registry) {
            this.registry = registry;
        }

        public void handle(HttpExchange t) throws IOException {
            String query = t.getRequestURI().getRawQuery();
            ByteArrayOutputStream response = (ByteArrayOutputStream)this.response.get();
            response.reset();
            OutputStreamWriter osw = new OutputStreamWriter(response);
            TextFormat.write004(osw, this.registry.filteredMetricFamilySamples(HTTPServer.parseQuery(query)));
            osw.flush();
            osw.close();
            response.flush();
            response.close();
            t.getResponseHeaders().set("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
            if (HTTPServer.shouldUseCompression(t)) {
                t.getResponseHeaders().set("Content-Encoding", "gzip");
                t.sendResponseHeaders(200, 0L);
                GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
                response.writeTo(os);
                os.close();
            } else {
                t.getResponseHeaders().set("Content-Length", String.valueOf(response.size()));
                t.sendResponseHeaders(200, (long)response.size());
                response.writeTo(t.getResponseBody());
            }

            t.close();
        }
    }
  • this.registry.filteredMetricFamilySamplesでコレクターのMetricFamilySamplesを呼び出します
    public Enumeration<MetricFamilySamples> filteredMetricFamilySamples(Set<String> includedNames) {
        return new CollectorRegistry.MetricFamilySamplesEnumeration(includedNames);
    }
...
...
    class MetricFamilySamplesEnumeration implements Enumeration<MetricFamilySamples> {
        private final Iterator<Collector> collectorIter;
        private Iterator<MetricFamilySamples> metricFamilySamples;
        private MetricFamilySamples next;
        private Set<String> includedNames;

        MetricFamilySamplesEnumeration(Set<String> includedNames) {
            this.includedNames = includedNames;
            this.collectorIter = this.includedCollectorIterator(includedNames);
            this.findNextElement();
        }
        MetricFamilySamplesEnumeration() {
            this(Collections.emptySet());
        }
...
...
        private void findNextElement() {
            this.next = null;

            while(this.metricFamilySamples != null && this.metricFamilySamples.hasNext()) {
                this.next = this.filter((MetricFamilySamples)this.metricFamilySamples.next());
                if (this.next != null) {
                    return;
                }
            }

            if (this.next == null) {
                while(this.collectorIter.hasNext()) {
                    this.metricFamilySamples = ((Collector)this.collectorIter.next()).collect().iterator();

                    while(this.metricFamilySamples.hasNext()) {
                        this.next = this.filter((MetricFamilySamples)this.metricFamilySamples.next());
                        if (this.next != null) {
                            return;
                        }
                    }
                }
            }

        }

collectorIterの中のコレクターからメトリクスを探します この様な方式を用いて、http方式によりアプリケーション内部のjmx portを通じてメトリクスを読み込む事ができます

以上、jmx exporterのソースコード解析でした!

参考:

https://blog.csdn.net/qqqq0199181/article/details/83792364