1: <?php declare(strict_types=1);
2:
3: use Daphnie\Collector;
4:
5: /**
6: * Copyright (C) Apis Networks, Inc - All Rights Reserved.
7: *
8: * Unauthorized copying of this file, via any medium, is
9: * strictly prohibited without consent. Any dissemination of
10: * material herein is prohibited.
11: *
12: * For licensing inquiries email <licensing@apisnetworks.com>
13: *
14: * Written by Matt Saladna <matt@apisnetworks.com>, May 2017
15: */
16:
17: class Telemetry_Module extends Module_Skeleton
18: {
19: // @var Collector
20: private $collector;
21:
22: protected $exportedFunctions = [
23: '*' => PRIVILEGE_ADMIN,
24: 'metrics' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
25: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
26: 'get' => PRIVILEGE_ADMIN | PRIVILEGE_SITE,
27: 'range' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
28: 'histogram' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
29: 'has' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
30: 'interval' => PRIVILEGE_SITE | PRIVILEGE_ADMIN
31: ];
32:
33: public function __construct()
34: {
35: parent::__construct();
36: if (!TELEMETRY_ENABLED) {
37: $this->exportedFunctions = [
38: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
39: '*' => PRIVILEGE_NONE
40: ];
41: }
42: }
43:
44: /**
45: * Telemetry is enabled
46: *
47: * @return bool
48: */
49: public function enabled(): bool
50: {
51: if ($this->permission_level & PRIVILEGE_ADMIN) {
52: return (bool)TELEMETRY_ENABLED;
53: }
54:
55: return TELEMETRY_ENABLED && (bool)$this->getServiceValue('metrics', 'enabled');
56: }
57:
58: /**
59: * Get latest metric value
60: *
61: * Performs a partial scan in last 12 hours
62: *
63: * @param string|array $metric metric name (dot notation)
64: * @param int|null $site_id
65: * @return int|int[]|null
66: */
67: public function get($metric, int $site_id = null)
68: {
69: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
70: if ($site_id && $site_id !== $this->site_id) {
71: error('Cannot specify site ID');
72: return null;
73: }
74: $site_id = $this->site_id;
75: }
76:
77: return $this->getCollector()->get($metric, $site_id);
78: }
79:
80: /**
81: * Metric exists
82: *
83: * @param string $metric
84: * @return bool
85: */
86: public function has(string $metric): bool
87: {
88: return null !== \Daphnie\MetricBroker::resolve($metric);
89: }
90:
91: /**
92: * Get metric range
93: *
94: * @param $metric
95: * @param int $begin when negative, now minus $begin
96: * @param int|null $end
97: * @param int|null $site_id
98: * @param string|bool $summable sum (bool) or interval ranges to sum as (string)
99: * @return int[]|int|null
100: */
101: public function range($metric, int $begin, ?int $end = null, int $site_id = null, $summable = true)
102: {
103: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
104: if ($site_id && $site_id !== $this->site_id) {
105: error('Cannot specify site ID');
106:
107: return null;
108: }
109: $site_id = $this->site_id;
110: }
111:
112: return $this->getCollector()->range($metric, $begin, $end, $site_id, $summable);
113: }
114:
115: /**
116: * Aggregate metrics into periodic intervals
117: *
118: * @param string|int $metric metric name or ID
119: * @param int $begin begin timestamp
120: * @param int|null $end ending timestamp
121: * @param int|null $site_id site ID
122: * @param int $size interval size
123: * @return mixed
124: */
125: public function interval($metric, int $begin, ?int $end = null, $site_id = null, int $size = 86400)
126: {
127: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
128: if ($site_id && $site_id !== $this->site_id) {
129: error('Cannot specify site ID');
130:
131: return null;
132: }
133: $site_id = $this->site_id;
134: }
135:
136: if ($size < 1 || $size > 2e31) {
137: return error("Invalid grouping size");
138: }
139:
140: if ($size < CRON_RESOLUTION) {
141: warn("Interval smaller than [cron] => resolution time %d seconds", CRON_RESOLUTION);
142: }
143:
144: return $this->getCollector()->interval($metric, $begin, $end, $site_id, $size);
145:
146: }
147:
148: /**
149: * Get metric histogram
150: *
151: * @param $metric
152: * @param int $begin
153: * @param int|null $end
154: * @param int|null $site_id
155: * @param int $buckets number of buckets to bin
156: * @param int|null $min minimum value to include
157: * @param int|null $max maximum value to include
158: * @return int[]|int|null
159: */
160: public function histogram($metric, int $begin, ?int $end = null, int $site_id = null, int $buckets = 5, int $min = 0, int $max = 1024)
161: {
162: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
163: if ($site_id && $site_id !== $this->site_id) {
164: error('Cannot specify site ID');
165:
166: return null;
167: }
168: $site_id = $this->site_id;
169: }
170:
171: if ($buckets > 50 || $buckets < 2) {
172: return error("Buckets must be within [2,50]");
173: }
174:
175: if ($min !== null && $max !== null && $min > $max) {
176: error("Minimum value %(min)d may not exceed %(max)d", [
177: 'min' => $min,
178: 'max' => $max
179: ]);
180:
181: return null;
182: }
183:
184: return $this->getCollector()->histogram($metric, $begin, $end, $site_id, $buckets, $min, $max);
185:
186: }
187:
188: /**
189: * Get collector instance
190: *
191: * @return Collector
192: */
193: private function getCollector(): Collector
194: {
195: if (!isset($this->collector)) {
196: $this->collector = new Collector(\PostgreSQL::pdo());
197: }
198:
199: return $this->collector;
200: }
201:
202: /**
203: * Drop metric value from database
204: *
205: * @param string $metric metric to discard
206: * @param bool $rekey rekey attribute metadata on next run
207: * @return bool
208: */
209: public function drop_metric(string $metric, bool $rekey = false): bool
210: {
211: if (null === ($id = $this->getCollector()->metricAsId($metric))) {
212: return false;
213: }
214:
215: $db = \PostgreSQL::pdo();
216:
217: $table = $rekey ? 'metric_attributes' : 'metrics';
218: $chunker = new \Daphnie\Chunker($db);
219: $chunker->decompressRange(null);
220: $stmt = $db->prepare("DELETE FROM $table WHERE attr_id = :attr_id");
221: $ret = $stmt->execute([':attr_id' => $id]);
222: $chunker->release();
223: return $ret ?: error('Failed to drop metric %(metric)s: %(err)s',
224: ['metric' => $metric, 'err' => array_get($stmt->errorInfo(), 2, '')]
225: );
226: }
227:
228: /**
229: * Timescale chunk statistics
230: *
231: * @return array
232: */
233: public function chunks(): array
234: {
235: return (new \Daphnie\Chunker(\PostgreSQL::pdo()))->getChunkStats();
236: }
237:
238: /**
239: * Get all metric symbols
240: *
241: * @return array
242: */
243: public function metrics(): array
244: {
245: return array_keys($this->getCollector()->all());
246: }
247:
248: /**
249: * Get metric compression usage
250: *
251: * @return array
252: */
253: public function db_compression_usage(): array {
254: $pg = PostgreSQL::pdo();
255: $query = (new \Daphnie\Connector($pg))->vendor()->getCompressionStats();
256: $res = $pg->query($query);
257: if (!$res) {
258: return [];
259: }
260:
261: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0, []);
262:
263: foreach ($rec as $k => $v) {
264: if (substr($k, -6) === '_bytes') {
265: $rec[$k] = \Formatter::changeBytes($v);
266: }
267: }
268:
269: return (array)$rec;
270: }
271:
272: /**
273: * Get metric usage
274: *
275: * @return array
276: */
277: public function db_usage(): array
278: {
279: $pg = PostgreSQL::pdo();
280: $query = (new \Daphnie\Connector($pg))->vendor()->databaseUsage();
281: $res = $pg->query($query);
282: if (!$res) {
283: return [];
284: }
285:
286: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0);
287:
288: foreach ($rec as $k => $v) {
289: if (substr($k, -6) === '_bytes') {
290: $rec[$k] = \Formatter::changeBytes($v);
291: }
292: }
293:
294: return (array)$rec;
295: }
296:
297: /**
298: * Decompress all chunks
299: *
300: * Note: reinitialize_compression() must be called after this
301: *
302: * @return bool
303: */
304: public function decompress_all(): bool
305: {
306: $pg = PostgreSQL::pdo();
307: $chunker = new \Daphnie\Chunker($pg);
308:
309: return $chunker->decompressAll();
310: }
311:
312: /**
313: * Reinitialize suspended compression
314: *
315: * @return bool
316: */
317: public function reinitialize_compression(): bool
318: {
319: $pg = PostgreSQL::pdo();
320: $chunker = new \Daphnie\Chunker($pg);
321: foreach ($chunker->getJobs() as $job) {
322: if (!$chunker->resumeJob($job['job_id'])) {
323: return false;
324: }
325: }
326:
327: return true;
328: }
329:
330: public function collect(): void
331: {
332: $collector = $this->getCollector();
333: foreach ($collector->getAnonymousCollections() as $collection) {
334: $collection->log($collector);
335: }
336:
337: }
338:
339: /**
340: * TimescaleDB version
341: *
342: * @return string|null
343: */
344: public function version(): ?string
345: {
346: if (!TELEMETRY_ENABLED) {
347: return null;
348: }
349:
350: return (new \Daphnie\Connector(\PostgreSQL::pdo()))->getVersion();
351: }
352:
353: public function _cron(Cronus $cron)
354: {
355: $this->collect();
356: /**
357: * Prevent losing configuration settings in allkeys-lru purge
358: */
359: $cache = \Cache_Global::spawn();
360: $cache->get(CONFIGURATION_KEY);
361: \Lararia\JobDaemon::snapshot();
362: }
363: }