本文介绍了Prometheus/CLIENT_PYTHON:如何在不重新启动的情况下分配新的注册表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我已经编写了一个小脚本来检测FlaskTM应用程序,我想编写单元测试,其中每个测试都可以针对模拟的FlaskTM应用程序编写请求,并测试指标,而不必处理来自以前测试方法的指标/请求,例如:
def test_grouped_codes():
app = create_app()
instrument(app)
# test stuff
但我无法&q;重置注册表,因此我总是在CollectorRegistry&q;中收到错误&t;重复的时间序列。
如何在运行时重置Prometheus Python客户端库的注册表(或将其设置为空注册表)?
我尝试了以下方法,但不起作用:
def create_app():
app = Flask(__name__)
registry = CollectorRegistry() # Create new registry.
prometheus_client.REGISTRY = registry # Try to override global registry.
prometheus_client.registry.REGISTRY = registry # Try to override global registry.
@app.route("/")
def home():
return "Hello World!"
# More functions ...
@app.route("/metrics")
@FlaskInstrumentator.do_not_track()
def metrics():
data = generate_latest(registry)
headers = {
"Content-Type": CONTENT_TYPE_LATEST,
"Content-Length": str(len(data))}
return data, 200, headers
return app
我在堆栈溢出here上发现了以下QA。@Brian-巴西建议在模块级别上声明指标,但这样我就必须硬编码我想避免的标签名称。有些使用handler
,其他使用method
或path
,因此我希望保持可自定义。
推荐答案
好的,感谢thisAnswer by@Xitrum&Quot;Answer by@Xitrum&Quot;我找到了解决方案:
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)
现在所有测试都可以从自己的注册表开始:
def test_metrics_endpoint_availability():
app = create_app()
FlaskInstrumentator(app).instrument()
client = app.test_client()
response = client.get("/")
response = client.get("/metrics")
# Test stuff
def test_grouped_status_codes():
app = create_app()
FlaskInstrumentator(app).instrument()
client = app.test_client()
client.get("/does_not_exist") # Should be ignored.
client.get("/does_not_exist") # Should be ignored.
client.post("/")
client.post("/")
# Test stuff
这篇关于Prometheus/CLIENT_PYTHON:如何在不重新启动的情况下分配新的注册表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!