1: <?php declare(strict_types=1);
2:
3: use Daphnie\Collector;
4: use Module\Skeleton\Contracts\Tasking;
5:
6: /**
7: * Copyright (C) Apis Networks, Inc - All Rights Reserved.
8: *
9: * Unauthorized copying of this file, via any medium, is
10: * strictly prohibited without consent. Any dissemination of
11: * material herein is prohibited.
12: *
13: * For licensing inquiries email <licensing@apisnetworks.com>
14: *
15: * Written by Matt Saladna <matt@apisnetworks.com>, May 2017
16: */
17:
18: class Telemetry_Module extends Module_Skeleton implements Tasking
19: {
20: // @var Collector
21: private $collector;
22:
23: protected $exportedFunctions = [
24: '*' => PRIVILEGE_ADMIN,
25: 'metrics' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
26: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
27: 'get' => PRIVILEGE_ADMIN | PRIVILEGE_SITE,
28: 'range' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
29: 'histogram' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
30: 'has' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
31: 'interval' => PRIVILEGE_SITE | PRIVILEGE_ADMIN
32: ];
33:
34: public function __construct()
35: {
36: parent::__construct();
37: if (!TELEMETRY_ENABLED) {
38: $this->exportedFunctions = [
39: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
40: '*' => PRIVILEGE_NONE
41: ];
42: }
43: }
44:
45: /**
46: * Telemetry is enabled
47: *
48: * @return bool
49: */
50: public function enabled(): bool
51: {
52: if ($this->permission_level & PRIVILEGE_ADMIN) {
53: return (bool)TELEMETRY_ENABLED;
54: }
55:
56: return TELEMETRY_ENABLED && (bool)$this->getServiceValue('metrics', 'enabled');
57: }
58:
59: /**
60: * Get latest metric value
61: *
62: * Performs a partial scan in last 12 hours
63: *
64: * @param string|array $metric metric name (dot notation)
65: * @param int|null $site_id
66: * @return int|int[]|null
67: */
68: public function get($metric, int $site_id = null)
69: {
70: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
71: if ($site_id && $site_id !== $this->site_id) {
72: error('Cannot specify site ID');
73: return null;
74: }
75: $site_id = $this->site_id;
76: }
77:
78: return $this->getCollector()->get($metric, $site_id);
79: }
80:
81: /**
82: * Metric exists
83: *
84: * @param string $metric
85: * @return bool
86: */
87: public function has(string $metric): bool
88: {
89: return null !== \Daphnie\MetricBroker::resolve($metric);
90: }
91:
92: /**
93: * Get metric range
94: *
95: * @param $metric
96: * @param int $begin when negative, now minus $begin
97: * @param int|null $end
98: * @param int|null $site_id
99: * @param string|bool $summable sum (bool) or interval ranges to sum as (string)
100: * @return int[]|int|null
101: */
102: public function range($metric, int $begin, ?int $end = null, int $site_id = null, $summable = true)
103: {
104: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
105: if ($site_id && $site_id !== $this->site_id) {
106: error('Cannot specify site ID');
107:
108: return null;
109: }
110: $site_id = $this->site_id;
111: }
112:
113: return $this->getCollector()->range($metric, $begin, $end, $site_id, $summable);
114: }
115:
116: /**
117: * Aggregate metrics into periodic intervals
118: *
119: * @param string|int $metric metric name or ID
120: * @param int $begin begin timestamp
121: * @param int|null $end ending timestamp
122: * @param int|null $site_id site ID
123: * @param int $size interval size
124: * @return mixed
125: */
126: public function interval($metric, int $begin, ?int $end = null, $site_id = null, int $size = 86400)
127: {
128: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
129: if ($site_id && $site_id !== $this->site_id) {
130: error('Cannot specify site ID');
131:
132: return null;
133: }
134: $site_id = $this->site_id;
135: }
136:
137: if ($size < 1 || $size > 2e31) {
138: return error("Invalid grouping size");
139: }
140:
141: if ($size < CRON_RESOLUTION) {
142: warn("Interval smaller than [cron] => resolution time %d seconds", CRON_RESOLUTION);
143: }
144:
145: return $this->getCollector()->interval($metric, $begin, $end, $site_id, $size);
146:
147: }
148:
149: /**
150: * Get metric histogram
151: *
152: * @param $metric
153: * @param int $begin
154: * @param int|null $end
155: * @param int|null $site_id
156: * @param int $buckets number of buckets to bin
157: * @param int|null $min minimum value to include
158: * @param int|null $max maximum value to include
159: * @return int[]|int|null
160: */
161: public function histogram($metric, int $begin, ?int $end = null, int $site_id = null, int $buckets = 5, int $min = 0, int $max = 1024)
162: {
163: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
164: if ($site_id && $site_id !== $this->site_id) {
165: error('Cannot specify site ID');
166:
167: return null;
168: }
169: $site_id = $this->site_id;
170: }
171:
172: if ($buckets > 50 || $buckets < 2) {
173: return error("Buckets must be within [2,50]");
174: }
175:
176: if ($min !== null && $max !== null && $min > $max) {
177: error("Minimum value %(min)d may not exceed %(max)d", [
178: 'min' => $min,
179: 'max' => $max
180: ]);
181:
182: return null;
183: }
184:
185: return $this->getCollector()->histogram($metric, $begin, $end, $site_id, $buckets, $min, $max);
186:
187: }
188:
189: /**
190: * Get collector instance
191: *
192: * @return Collector
193: */
194: private function getCollector(): Collector
195: {
196: if (!isset($this->collector)) {
197: $this->collector = new Collector(\PostgreSQL::pdo());
198: }
199:
200: return $this->collector;
201: }
202:
203: /**
204: * Drop metric value from database
205: *
206: * @param string $metric metric to discard
207: * @param bool $rekey rekey attribute metadata on next run
208: * @return bool
209: */
210: public function drop_metric(string $metric, bool $rekey = false): bool
211: {
212: if (null === ($id = $this->getCollector()->metricAsId($metric))) {
213: return false;
214: }
215:
216: $db = \PostgreSQL::pdo();
217:
218: $table = $rekey ? 'metric_attributes' : 'metrics';
219: $chunker = new \Daphnie\Chunker($db);
220: $chunker->decompressRange(null);
221: $stmt = $db->prepare("DELETE FROM $table WHERE attr_id = :attr_id");
222: $ret = $stmt->execute([':attr_id' => $id]);
223: $chunker->release();
224: return $ret ?: error('Failed to drop metric %(metric)s: %(err)s',
225: ['metric' => $metric, 'err' => array_get($stmt->errorInfo(), 2, '')]
226: );
227: }
228:
229: /**
230: * Timescale chunk statistics
231: *
232: * @return array
233: */
234: public function chunks(): array
235: {
236: return (new \Daphnie\Chunker(\PostgreSQL::pdo()))->getChunkStats();
237: }
238:
239: /**
240: * Get all metric symbols
241: *
242: * @return array
243: */
244: public function metrics(): array
245: {
246: return array_keys($this->getCollector()->all());
247: }
248:
249: /**
250: * Get metric compression usage
251: *
252: * @return array
253: */
254: public function db_compression_usage(): array {
255: $pg = PostgreSQL::pdo();
256: $query = (new \Daphnie\Connector($pg))->vendor()->getCompressionStats();
257: $res = $pg->query($query);
258: if (!$res) {
259: return [];
260: }
261:
262: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0, []);
263:
264: foreach ($rec as $k => $v) {
265: if (substr($k, -6) === '_bytes') {
266: $rec[$k] = \Formatter::changeBytes($v);
267: }
268: }
269:
270: return (array)$rec;
271: }
272:
273: /**
274: * Get metric usage
275: *
276: * @return array
277: */
278: public function db_usage(): array
279: {
280: $pg = PostgreSQL::pdo();
281: $query = (new \Daphnie\Connector($pg))->vendor()->databaseUsage();
282: $res = $pg->query($query);
283: if (!$res) {
284: return [];
285: }
286:
287: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0);
288:
289: foreach ($rec as $k => $v) {
290: if (substr($k, -6) === '_bytes') {
291: $rec[$k] = \Formatter::changeBytes($v);
292: }
293: }
294:
295: return (array)$rec;
296: }
297:
298: /**
299: * Decompress all chunks
300: *
301: * Note: reinitialize_compression() must be called after this
302: *
303: * @return bool
304: */
305: public function decompress_all(): bool
306: {
307: $pg = PostgreSQL::pdo();
308: $chunker = new \Daphnie\Chunker($pg);
309:
310: return $chunker->decompressAll();
311: }
312:
313: /**
314: * Reinitialize suspended compression
315: *
316: * @return bool
317: */
318: public function reinitialize_compression(): bool
319: {
320: $pg = PostgreSQL::pdo();
321: $chunker = new \Daphnie\Chunker($pg);
322: foreach ($chunker->getJobs() as $job) {
323: if (!$chunker->resumeJob($job['job_id'])) {
324: return false;
325: }
326: }
327:
328: return true;
329: }
330:
331: public function collect(): void
332: {
333: $collector = $this->getCollector();
334: foreach ($collector->getAnonymousCollections() as $collection) {
335: $collection->log($collector);
336: }
337:
338: }
339:
340: /**
341: * TimescaleDB version
342: *
343: * @return string|null
344: */
345: public function version(): ?string
346: {
347: if (!TELEMETRY_ENABLED) {
348: return null;
349: }
350:
351: return (new \Daphnie\Connector(\PostgreSQL::pdo()))->getVersion();
352: }
353:
354: public function _cron(Cronus $cron)
355: {
356: $this->collect();
357: /**
358: * Prevent losing configuration settings in allkeys-lru purge
359: */
360: $cache = \Cache_Global::spawn();
361: $cache->get(CONFIGURATION_KEY);
362: \Lararia\JobDaemon::snapshot();
363: }
364: }