jmx exporter ソースコード解析
Jmx_exporterとは?
JMXのMBeansから、Prometheus用の形式に変換し、メトリクスを出力することができるExporterです。
どうやって使うの?
普通のアプリケーションはもちろん、KafkaやHDFSなどのオープンソースでもJMX用のポートを開ける物であれば簡単にアプリケーションの各種パラメーターを指定したポートに出力し、prometheusを使ってモニタリングシステムを組む事ができます
使い方は公式githubにて詳しく説明があるので割愛します。
ソースコード解析
凄くシンプルなプログラムなので、入り口から順に追って読んでみましょう
アプリケーション起動
入り口はここです: jmx_exporter/jmx_prometheus_javaagent/src/main/java/io/prometheus/jmx/JavaAgent.java#agentmain
public static void agentmain(String agentArgument, Instrumentation instrumentation) throws Exception { premain(agentArgument, instrumentation); } public static void premain(String agentArgument, Instrumentation instrumentation) throws Exception { // Bind to all interfaces by default (this includes IPv6). String host = "0.0.0.0"; try { Config config = parseConfig(agentArgument, host); new BuildInfoCollector().register(); new JmxCollector(new File(config.file)).register(); DefaultExports.initialize(); server = new HTTPServer(config.socket, CollectorRegistry.defaultRegistry, true); } catch (IllegalArgumentException e) { System.err.println("Usage: -javaagent:/path/to/JavaAgent.jar=[host:]<port>:<yaml configuration file> " + e.getMessage()); System.exit(1); } } public static Config parseConfig(String args, String ifc) { Pattern pattern = Pattern.compile( "^(?:((?:[\\w.]+)|(?:\\[.+])):)?" + // host name, or ipv4, or ipv6 address in brackets "(\\d{1,5}):" + // port "(.+)"); // config file Matcher matcher = pattern.matcher(args); if (!matcher.matches()) { throw new IllegalArgumentException("Malformed arguments - " + args); } String givenHost = matcher.group(1); String givenPort = matcher.group(2); String givenConfigFile = matcher.group(3); int port = Integer.parseInt(givenPort); InetSocketAddress socket; if (givenHost != null && !givenHost.isEmpty()) { socket = new InetSocketAddress(givenHost, port); } else { socket = new InetSocketAddress(ifc, port); givenHost = ifc; } return new Config(givenHost, port, givenConfigFile, socket); } static class Config { String host; int port; String file; InetSocketAddress socket; Config(String host, int port, String file, InetSocketAddress socket) { this.host = host; this.port = port; this.file = file; this.socket = socket; } } }
起動コマンドからArgumentをparseConfigでパースし、ホストやポート、configファイルなどを割り出しアプリケーションを起動しています
BuildInfoCollector
new BuildInfoCollector().register(); new JmxCollector(new File(config.file)).register();
アプリケーションの起動時、collectorsを作り、初期化しています。
BuildInfoCollectorは起動時にその名の通りビルド時のバージョンなどを記録し、MetricFamilySamplesの中に保存しています。
public class BuildInfoCollector extends Collector { public List<Collector.MetricFamilySamples> collect() { List<Collector.MetricFamilySamples> mfs = new ArrayList<Collector.MetricFamilySamples>(); GaugeMetricFamily artifactInfo = new GaugeMetricFamily( "jmx_exporter_build_info", "A metric with a constant '1' value labeled with the version of the JMX exporter.", asList("version", "name")); Package pkg = this.getClass().getPackage(); String version = pkg.getImplementationVersion(); String name = pkg.getImplementationTitle(); artifactInfo.addMetric(asList( version != null ? version : "unknown", name != null ? name : "unknown" ), 1L); mfs.add(artifactInfo); return mfs; } }
JmxCollector
起動時に登録したもう一つのコレクターがJmxCollectorです。 こちらがメインにjmx_exporter.yamlに記述されたjmxを記録するクラスになります。
public JmxCollector(File in) throws IOException, MalformedObjectNameException { configFile = in; config = loadConfig((Map<String, Object>)new Yaml().load(new FileReader(in))); config.lastUpdate = configFile.lastModified(); }
起動時に指定されたYamlファイルをloadします
public List<MetricFamilySamples> collect() { // Config ファイルの最終編集時間を確認 if (configFile != null) { long mtime = configFile.lastModified(); // アップデートされていたら、リロードする if (mtime > config.lastUpdate) { LOGGER.fine("Configuration file changed, reloading..."); reloadConfig(); } } Receiver receiver = new Receiver(); JmxScraper scraper = new JmxScraper(config.jmxUrl, config.username, config.password, config.ssl, config.whitelistObjectNames, config.blacklistObjectNames, receiver, jmxMBeanPropertyCache); long start = System.nanoTime(); double error = 0; // startDelaySecondsはコンフィグファイルで指定。アプリケーションの起動をまつ if ((config.startDelaySeconds > 0) && ((start - createTimeNanoSecs) / 1000000000L < config.startDelaySeconds)) { throw new IllegalStateException("JMXCollector waiting for startDelaySeconds"); } try { // メトリクスの収集を始める scraper.doScrape(); } catch (Exception e) { ........ }
- configファイルが編集されているかチェックをする。
- startDelaySecondsはコンフィグファイルで指定。アプリケーションが立ち上がるのを待たせる事ができます。デフォルトは0
- scraper.doScrape() でmetricsのスクレイプを始めます。
doScrape
public void doScrape() throws Exception { MBeanServerConnection beanConn; JMXConnector jmxc = null; // 与えられたホストとポートを元にコネクションを張ってみる。 // SSLがある場合は認証を行う if (jmxUrl.isEmpty()) { beanConn = ManagementFactory.getPlatformMBeanServer(); } else { Map<String, Object> environment = new HashMap<String, Object>(); if (username != null && username.length() != 0 && password != null && password.length() != 0) { String[] credent = new String[] {username, password}; environment.put(javax.management.remote.JMXConnector.CREDENTIALS, credent); } if (ssl) { environment.put(Context.SECURITY_PROTOCOL, "ssl"); SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory); } jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); beanConn = jmxc.getMBeanServerConnection(); } try { // configに記載してあるホワイトリストのmbeansをハッシュセットに入れる // Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames() Set<ObjectName> mBeanNames = new HashSet<ObjectName>(); for (ObjectName name : whitelistObjectNames) { for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { mBeanNames.add(instance.getObjectName()); } } // configに記載してあるブラックリストのmbeansをハッシュセットから除く for (ObjectName name : blacklistObjectNames) { for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { mBeanNames.remove(instance.getObjectName()); } } // Now that we have *only* the whitelisted mBeans, remove any old ones from the cache: jmxMBeanPropertyCache.onlyKeepMBeans(mBeanNames); for (ObjectName objectName : mBeanNames) { long start = System.nanoTime(); // 残った有効なmbeansを取りに行く scrapeBean(beanConn, objectName); logger.fine("TIME: " + (System.nanoTime() - start) + " ns for " + objectName.toString()); } } finally { if (jmxc != null) { jmxc.close(); } } }
ここでやっている事は: - 与えられたホストとポートを元にコネクションを張ってみる - SSLがある場合は認証を行う - ホワイトリストとブラックリストを元に、ユーザーに指定されたmbeansのリストを作る - リストを元に、mbeansを取りに行く
private void scrapeBean(MBeanServerConnection beanConn, ObjectName mbeanName) { MBeanInfo info; try { // コネクションからbeansの情報を取得 info = beanConn.getMBeanInfo(mbeanName); } catch (IOException e) { logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e); return; } catch (JMException e) { logScrape(mbeanName.toString(), "getMBeanInfo Fail: " + e); return; } MBeanAttributeInfo[] attrInfos = info.getAttributes(); Map<String, MBeanAttributeInfo> name2AttrInfo = new LinkedHashMap<String, MBeanAttributeInfo>(); for (int idx = 0; idx < attrInfos.length; ++idx) { MBeanAttributeInfo attr = attrInfos[idx]; if (!attr.isReadable()) { logScrape(mbeanName, attr, "not readable"); continue; } // 有効な値をLinkedHashMapに入れていく name2AttrInfo.put(attr.getName(), attr); } final AttributeList attributes; try { attributes = beanConn.getAttributes(mbeanName, name2AttrInfo.keySet().toArray(new String[0])); } catch (Exception e) { logScrape(mbeanName, name2AttrInfo.keySet(), "Fail: " + e); return; } // ここでは、processBeanValueを使い、recursiveにデータを処理していく for (Attribute attribute : attributes.asList()) { MBeanAttributeInfo attr = name2AttrInfo.get(attribute.getName()); logScrape(mbeanName, attr, "process"); processBeanValue( mbeanName.getDomain(), jmxMBeanPropertyCache.getKeyPropertyList(mbeanName), new LinkedList<String>(), attr.getName(), attr.getType(), attr.getDescription(), attribute.getValue() ); } }
- 主にやっている事は、有効なbeansをリストにprocessBeanValueを使い処理していく事です
processBeanValue
private void processBeanValue( String domain, LinkedHashMap<String, String> beanProperties, LinkedList<String> attrKeys, String attrName, String attrType, String attrDescription, Object value) { if (value == null) { logScrape(domain + beanProperties + attrName, "null"); // 基本型だったら記録 } else if (value instanceof Number || value instanceof String || value instanceof Boolean) { logScrape(domain + beanProperties + attrName, value.toString()); this.receiver.recordBean( domain, beanProperties, attrKeys, attrName, attrType, attrDescription, value); // そうでなかったらリカーシブルに基本型になるまでサーチしてく } else if (value instanceof CompositeData) { logScrape(domain + beanProperties + attrName, "compositedata"); CompositeData composite = (CompositeData) value; CompositeType type = composite.getCompositeType(); attrKeys = new LinkedList<String>(attrKeys); attrKeys.add(attrName); for(String key : type.keySet()) { String typ = type.getType(key).getTypeName(); Object valu = composite.get(key); processBeanValue( domain, beanProperties, attrKeys, key, typ, type.getDescription(), valu); }
- 前の処理の続きで、processBeanValueに入ったbeans
- 基本型(数字、文字列、ブーリアン)だったら記録
- 他の型だったらrecursiveで基本型になるまで掘る
public void recordBean( String domain, LinkedHashMap<String, String> beanProperties, LinkedList<String> attrKeys, String attrName, String attrType, String attrDescription, Object beanValue) { String beanName = domain + angleBrackets(beanProperties.toString()) + angleBrackets(attrKeys.toString()); // attrDescription tends not to be useful, so give the fully qualified name too. String help = attrDescription + " (" + beanName + attrName + ")"; String attrNameSnakeCase = toSnakeAndLowerCase(attrName); // Configの中で書いたruleに基づき、マッチしたものだけ残す for (Rule rule : config.rules) { Matcher matcher = null; String matchName = beanName + (rule.attrNameSnakeCase ? attrNameSnakeCase : attrName); if (rule.pattern != null) { matcher = rule.pattern.matcher(matchName + ": " + beanValue); if (!matcher.matches()) { continue; } } Number value; if (rule.value != null && !rule.value.isEmpty()) { String val = matcher.replaceAll(rule.value); try { beanValue = Double.valueOf(val); } catch (NumberFormatException e) { LOGGER.fine("Unable to parse configured value '" + val + "' to number for bean: " + beanName + attrName + ": " + beanValue); return; } } if (beanValue instanceof Number) { value = ((Number)beanValue).doubleValue() * rule.valueFactor; } else if (beanValue instanceof Boolean) { value = (Boolean)beanValue ? 1 : 0; } else { LOGGER.fine("Ignoring unsupported bean: " + beanName + attrName + ": " + beanValue); return; } // If there's no name provided, use default export format. if (rule.name == null) { defaultExport(domain, beanProperties, attrKeys, rule.attrNameSnakeCase ? attrNameSnakeCase : attrName, help, value, rule.type); return; } // Matcher is set below here due to validation in the constructor. String name = safeName(matcher.replaceAll(rule.name)); if (name.isEmpty()) { return; } if (config.lowercaseOutputName) { name = name.toLowerCase(); } // Set the help. if (rule.help != null) { help = matcher.replaceAll(rule.help); } // Set the labels. ArrayList<String> labelNames = new ArrayList<String>(); ArrayList<String> labelValues = new ArrayList<String>(); if (rule.labelNames != null) { for (int i = 0; i < rule.labelNames.size(); i++) { final String unsafeLabelName = rule.labelNames.get(i); final String labelValReplacement = rule.labelValues.get(i); try { String labelName = safeName(matcher.replaceAll(unsafeLabelName)); String labelValue = matcher.replaceAll(labelValReplacement); if (config.lowercaseOutputLabelNames) { labelName = labelName.toLowerCase(); } if (!labelName.isEmpty() && !labelValue.isEmpty()) { labelNames.add(labelName); labelValues.add(labelValue); } } catch (Exception e) { throw new RuntimeException( format("Matcher '%s' unable to use: '%s' value: '%s'", matcher, unsafeLabelName, labelValReplacement), e); } } } // Add to samples. LOGGER.fine("add metric sample: " + name + " " + labelNames + " " + labelValues + " " + value.doubleValue()); addSample(new MetricFamilySamples.Sample(name, labelNames, labelValues, value.doubleValue()), rule.type, help); return; } } }
- Configで記載したruleに基づき、パターンマッチしたbeansのみ残す
- Labelなど指定されたものを追加していく
最初のJmxCollector classの続きをみてみましょう
List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(); mfsList.addAll(receiver.metricFamilySamplesMap.values()); List<MetricFamilySamples.Sample> samples = new ArrayList<MetricFamilySamples.Sample>(); samples.add(new MetricFamilySamples.Sample( "jmx_scrape_duration_seconds", new ArrayList<String>(), new ArrayList<String>(), (System.nanoTime() - start) / 1.0E9)); mfsList.add(new MetricFamilySamples("jmx_scrape_duration_seconds", Type.GAUGE, "Time this JMX scrape took, in seconds.", samples)); samples = new ArrayList<MetricFamilySamples.Sample>(); samples.add(new MetricFamilySamples.Sample( "jmx_scrape_error", new ArrayList<String>(), new ArrayList<String>(), error)); mfsList.add(new MetricFamilySamples("jmx_scrape_error", Type.GAUGE, "Non-zero if this scrape failed.", samples)); return mfsList; }
- jmx_exporterのメトリクス、jmx_scrape_errorとjmx_scrape_duration_secondsを追加
- 実は今回のソースコード解析、この二つのデフォルトメトリクスをrenameできないか調べるのが目的でして、できなさそうですね涙
Registor
ここまででメトリクスの収集が完了しました 収集した物をコレクターと呼ぶ ここからはレジストリのコードです *) この部分はdependencyのなかにあります。jmx_exporterの中にはありません。
// io.prometheus.client.CollectorRegistry#register() public void register(Collector m) { List<String> names = this.collectorNames(m); synchronized(this.collectorsToNames) { Iterator var4 = names.iterator(); String name; while(var4.hasNext()) { name = (String)var4.next(); // 既にレジストされているコレクターだったらエラー if (this.namesToCollectors.containsKey(name)) { throw new IllegalArgumentException("Collector already registered that provides name: " + name); } } var4 = names.iterator(); while(var4.hasNext()) { name = (String)var4.next(); // namesToCollectorsに格納 this.namesToCollectors.put(name, m); } // collectorsToNamesに格納 this.collectorsToNames.put(m, names); } }
- コレクターに重複がないかチェックしたら二つのHashmapに格納する
- namesToCollectorsとcollectorsToNamesはその名の通りコレクターと名前のマップ
- names と collectors は一対一ではない
サーバーの起動
// io.prometheus.jmx.JavaAgent.agentmain .... new BuildInfoCollector().register(); new JmxCollector(new File(config.file)).register(); DefaultExports.initialize(); server = new HTTPServer(config.socket, CollectorRegistry.defaultRegistry, true);
アプリケーション起動の最後の一歩
public HTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException { this.server = HttpServer.create(); this.server.bind(addr, 3); HttpHandler mHandler = new HTTPServer.HTTPMetricHandler(registry); this.server.createContext("/", mHandler); this.server.createContext("/metrics", mHandler); this.executorService = Executors.newFixedThreadPool(5, HTTPServer.DaemonThreadFactory.defaultThreadFactory(daemon)); this.server.setExecutor(this.executorService); this.start(daemon); } ....
- httpサーバーデーモンを起動しています
- argumentにあるCollectorRegistryが先程の手順で起動したcollectorregistryです
- HttpHandler mHandler = new HTTPServer.HTTPMetricHandler(registry)を通してregistryにアクセスします
HTTPMetricHandler
static class HTTPMetricHandler implements HttpHandler { private CollectorRegistry registry; private final HTTPServer.LocalByteArray response = new HTTPServer.LocalByteArray(); HTTPMetricHandler(CollectorRegistry registry) { this.registry = registry; } public void handle(HttpExchange t) throws IOException { String query = t.getRequestURI().getRawQuery(); ByteArrayOutputStream response = (ByteArrayOutputStream)this.response.get(); response.reset(); OutputStreamWriter osw = new OutputStreamWriter(response); TextFormat.write004(osw, this.registry.filteredMetricFamilySamples(HTTPServer.parseQuery(query))); osw.flush(); osw.close(); response.flush(); response.close(); t.getResponseHeaders().set("Content-Type", "text/plain; version=0.0.4; charset=utf-8"); if (HTTPServer.shouldUseCompression(t)) { t.getResponseHeaders().set("Content-Encoding", "gzip"); t.sendResponseHeaders(200, 0L); GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody()); response.writeTo(os); os.close(); } else { t.getResponseHeaders().set("Content-Length", String.valueOf(response.size())); t.sendResponseHeaders(200, (long)response.size()); response.writeTo(t.getResponseBody()); } t.close(); } }
- this.registry.filteredMetricFamilySamplesでコレクターのMetricFamilySamplesを呼び出します
public Enumeration<MetricFamilySamples> filteredMetricFamilySamples(Set<String> includedNames) { return new CollectorRegistry.MetricFamilySamplesEnumeration(includedNames); } ... ... class MetricFamilySamplesEnumeration implements Enumeration<MetricFamilySamples> { private final Iterator<Collector> collectorIter; private Iterator<MetricFamilySamples> metricFamilySamples; private MetricFamilySamples next; private Set<String> includedNames; MetricFamilySamplesEnumeration(Set<String> includedNames) { this.includedNames = includedNames; this.collectorIter = this.includedCollectorIterator(includedNames); this.findNextElement(); } MetricFamilySamplesEnumeration() { this(Collections.emptySet()); } ... ... private void findNextElement() { this.next = null; while(this.metricFamilySamples != null && this.metricFamilySamples.hasNext()) { this.next = this.filter((MetricFamilySamples)this.metricFamilySamples.next()); if (this.next != null) { return; } } if (this.next == null) { while(this.collectorIter.hasNext()) { this.metricFamilySamples = ((Collector)this.collectorIter.next()).collect().iterator(); while(this.metricFamilySamples.hasNext()) { this.next = this.filter((MetricFamilySamples)this.metricFamilySamples.next()); if (this.next != null) { return; } } } } }
collectorIterの中のコレクターからメトリクスを探します この様な方式を用いて、http方式によりアプリケーション内部のjmx portを通じてメトリクスを読み込む事ができます
参考: