diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index 4f2611bb..ff5edcb0 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -1,12 +1,14 @@ from __future__ import unicode_literals from collections import defaultdict +from functools import wraps import glob import json import os +from .process_lock import lock, unlock, LOCK_EX from .metrics_core import Metric -from .mmap_dict import MmapedDict +from .mmap_dict import MmapedDict, mmap_key from .samples import Sample from .utils import floatToGoString @@ -18,6 +20,21 @@ MP_METRIC_HELP = 'Multiprocess metric' +def require_metrics_lock(func): + @wraps(func) + def _wrap(*args, **kwargs): + path = os.environ.get('prometheus_multiproc_dir') + f = open(os.path.join(path, 'metrics.lock'), 'w') + try: + lock(f, LOCK_EX) + return func(*args, **kwargs) + finally: + unlock(f) + f.close() + + return _wrap + + class MultiProcessCollector(object): """Collector for files for multi-process mode.""" @@ -31,6 +48,7 @@ def __init__(self, registry, path=None): registry.register(self) @staticmethod + @require_metrics_lock def merge(files, accumulate=True): """Merge metrics from given mmap files. @@ -149,6 +167,7 @@ def collect(self): return self.merge(files, accumulate=True) +@require_metrics_lock def mark_process_dead(pid, path=None): """Do bookkeeping for when one process dies in a multi-process setup.""" if path is None: @@ -157,3 +176,43 @@ def mark_process_dead(pid, path=None): os.remove(f) for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))): os.remove(f) + + # get associated db files with pid + files = glob.glob(os.path.join(path, '*_{0}.db'.format(pid))) + if not files: + return + + # get merge file name + merge_files = [] + for f in files: + file_prefix = os.path.basename(f).rsplit('_', 1)[0] + merge_file = os.path.join(path, '{0}_merge.db'.format(file_prefix)) + if merge_file not in merge_files: + merge_files.append(merge_file) + + # if not exist merge_file, create and init it + if not os.path.exists(merge_file): + MmapedDict(merge_file).close() + + # do merge, here we use the same method to merge + metrics = MultiProcessCollector.merge(files + merge_files, accumulate=False) + typ_metrics_dict = defaultdict(list) + for metric in metrics: + typ_metrics_dict[metric.type].append(metric) + + # write data to correct merge_file + for merge_file in merge_files: + typ = os.path.basename(merge_file).split('_')[0] + d = MmapedDict(merge_file) + for metric in typ_metrics_dict[typ]: + for sample in metric.samples: + labels = values = [] + if sample.labels: + labels, values = zip(*sample.labels.items()) + key = mmap_key(metric.name, sample.name, labels, values) + d.write_value(key, sample.value) + d.close() + + # remove the old db file + for f in files: + os.remove(f) diff --git a/prometheus_client/process_lock.py b/prometheus_client/process_lock.py new file mode 100644 index 00000000..24ddbd0a --- /dev/null +++ b/prometheus_client/process_lock.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# coding=utf-8 + +import os + +__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock') + + +def _fd(f): + return f.fileno() if hasattr(f, 'fileno') else f + + +if os.name == 'nt': + import msvcrt + from ctypes import (sizeof, c_ulong, c_void_p, c_int64, + Structure, Union, POINTER, windll, byref) + from ctypes.wintypes import BOOL, DWORD, HANDLE + + LOCK_SH = 0 # the default + LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY + LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK + + if sizeof(c_ulong) != sizeof(c_void_p): + ULONG_PTR = c_int64 + else: + ULONG_PTR = c_ulong + PVOID = c_void_p + + + class _OFFSET(Structure): + _fields_ = [ + ('Offset', DWORD), + ('OffsetHigh', DWORD)] + + + class _OFFSET_UNION(Union): + _anonymous_ = ['_offset'] + _fields_ = [ + ('_offset', _OFFSET), + ('Pointer', PVOID)] + + + class OVERLAPPED(Structure): + _anonymous_ = ['_offset_union'] + _fields_ = [ + ('Internal', ULONG_PTR), + ('InternalHigh', ULONG_PTR), + ('_offset_union', _OFFSET_UNION), + ('hEvent', HANDLE)] + + + LPOVERLAPPED = POINTER(OVERLAPPED) + + LockFileEx = windll.kernel32.LockFileEx + LockFileEx.restype = BOOL + LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] + UnlockFileEx = windll.kernel32.UnlockFileEx + UnlockFileEx.restype = BOOL + UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] + + + def lock(f, flags): + hfile = msvcrt.get_osfhandle(_fd(f)) + overlapped = OVERLAPPED() + ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped)) + return bool(ret) + + + def unlock(f): + hfile = msvcrt.get_osfhandle(_fd(f)) + overlapped = OVERLAPPED() + ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped)) + return bool(ret) +else: + try: + import fcntl + + LOCK_SH = fcntl.LOCK_SH # shared lock + LOCK_NB = fcntl.LOCK_NB # non-blocking + LOCK_EX = fcntl.LOCK_EX + except (ImportError, AttributeError): + LOCK_EX = LOCK_SH = LOCK_NB = 0 + + + def lock(f, flags): + return False + + + def unlock(f): + return True + else: + def lock(f, flags): + ret = fcntl.flock(_fd(f), flags) + return ret == 0 + + + def unlock(f): + ret = fcntl.flock(_fd(f), fcntl.LOCK_UN) + return ret == 0 diff --git a/prometheus_client/values.py b/prometheus_client/values.py index f572dcf7..48bad02d 100644 --- a/prometheus_client/values.py +++ b/prometheus_client/values.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import os +import psutil from threading import Lock from .mmap_dict import mmap_key, MmapedDict @@ -28,7 +29,17 @@ def get(self): return self._value -def MultiProcessValue(process_identifier=os.getpid): +def default_process_identifier(): + """ + 2 process may have same identifier by using os.getpid only, + so here we add the process create time to identifier + """ + pid = os.getpid() + p = psutil.Process(pid) + return "{}_{}".format(pid, int(p.create_time())) + + +def MultiProcessValue(process_identifier=default_process_identifier): """Returns a MmapedValue class based on a process_identifier function. The 'process_identifier' function MUST comply with this simple rule: