1: <?php declare(strict_types=1);
2:
3: use Daphnie\Collector;
4: use Module\Skeleton\Contracts\Tasking;
5:
6: /**
7: * Copyright (C) Apis Networks, Inc - All Rights Reserved.
8: *
9: * Unauthorized copying of this file, via any medium, is
10: * strictly prohibited without consent. Any dissemination of
11: * material herein is prohibited.
12: *
13: * For licensing inquiries email <licensing@apisnetworks.com>
14: *
15: * Written by Matt Saladna <matt@apisnetworks.com>, May 2017
16: */
17:
18: class Telemetry_Module extends Module_Skeleton implements Tasking
19: {
20: // @var Collector
21: private $collector;
22:
23: protected $exportedFunctions = [
24: '*' => PRIVILEGE_ADMIN,
25: 'metrics' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
26: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
27: 'get' => PRIVILEGE_ADMIN | PRIVILEGE_SITE,
28: 'range' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
29: 'histogram' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
30: 'has' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
31: 'interval' => PRIVILEGE_SITE | PRIVILEGE_ADMIN
32: ];
33:
34: public function __construct()
35: {
36: parent::__construct();
37: if (!TELEMETRY_ENABLED) {
38: $this->exportedFunctions = [
39: 'enabled' => PRIVILEGE_SITE | PRIVILEGE_ADMIN,
40: '*' => PRIVILEGE_NONE
41: ];
42: }
43: }
44:
45: /**
46: * Telemetry is enabled
47: *
48: * @return bool
49: */
50: public function enabled(): bool
51: {
52: if ($this->permission_level & PRIVILEGE_ADMIN) {
53: return (bool)TELEMETRY_ENABLED;
54: }
55:
56: return TELEMETRY_ENABLED && (bool)$this->getServiceValue('metrics', 'enabled');
57: }
58:
59: /**
60: * Get latest metric value
61: *
62: * Performs a partial scan in last 12 hours
63: *
64: * @param string|array $metric metric name (dot notation)
65: * @param int|null $site_id
66: * @return int|int[]|null
67: */
68: public function get($metric, int $site_id = null)
69: {
70: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
71: if ($site_id && $site_id !== $this->site_id) {
72: error('Cannot specify site ID');
73: return null;
74: }
75: $site_id = $this->site_id;
76: }
77:
78: return $this->getCollector()->get($metric, $site_id);
79: }
80:
81: /**
82: * Metric exists
83: *
84: * @param string $metric
85: * @return bool
86: */
87: public function has(string $metric): bool
88: {
89: return null !== \Daphnie\MetricBroker::resolve($metric);
90: }
91:
92: /**
93: * Get metric range
94: *
95: * @param $metric
96: * @param int $begin when negative, now minus $begin
97: * @param int|null $end
98: * @param int|null $site_id
99: * @param string|bool $summable sum (bool) or interval ranges to sum as (string)
100: * @return int[]|int|null
101: */
102: public function range($metric, int $begin, ?int $end = null, int $site_id = null, $summable = true)
103: {
104: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
105: if ($site_id && $site_id !== $this->site_id) {
106: error('Cannot specify site ID');
107:
108: return null;
109: }
110: $site_id = $this->site_id;
111: }
112:
113: return $this->getCollector()->range($metric, $begin, $end, $site_id, $summable);
114: }
115:
116: /**
117: * Aggregate metrics into periodic intervals
118: *
119: * @param string|int $metric metric name or ID
120: * @param int $begin begin timestamp
121: * @param int|null $end ending timestamp
122: * @param int|null $site_id site ID
123: * @param int $size interval size
124: * @return mixed
125: */
126: public function interval($metric, int $begin, ?int $end = null, $site_id = null, int $size = 86400)
127: {
128: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
129: if ($site_id && $site_id !== $this->site_id) {
130: error('Cannot specify site ID');
131:
132: return null;
133: }
134: $site_id = $this->site_id;
135: }
136:
137: if ($size < 1 || $size > 2e31) {
138: return error("Invalid grouping size");
139: }
140:
141: if ($size < CRON_RESOLUTION) {
142: warn("Interval smaller than [cron] => resolution time %d seconds", CRON_RESOLUTION);
143: }
144:
145: return $this->getCollector()->interval($metric, $begin, $end, $site_id, $size);
146:
147: }
148:
149: /**
150: * Get metric histogram
151: *
152: * @param $metric
153: * @param int $begin
154: * @param int|null $end
155: * @param int|null $site_id
156: * @param int $buckets number of buckets to bin
157: * @param int|null $min minimum value to include
158: * @param int|null $max maximum value to include
159: * @return int[]|int|null
160: */
161: public function histogram($metric, int $begin, ?int $end = null, int $site_id = null, int $buckets = 5, int $min = 0, int $max = 1024)
162: {
163: if ($this->permission_level & (PRIVILEGE_USER | PRIVILEGE_SITE)) {
164: if ($site_id && $site_id !== $this->site_id) {
165: error('Cannot specify site ID');
166:
167: return null;
168: }
169: $site_id = $this->site_id;
170: }
171:
172: if ($buckets > 50 || $buckets < 2) {
173: return error("Buckets must be within [2,50]");
174: }
175:
176: if ($min !== null && $max !== null && $min > $max) {
177: error("Minimum value %(min)d may not exceed %(max)d", [
178: 'min' => $min,
179: 'max' => $max
180: ]);
181:
182: return null;
183: }
184:
185: return $this->getCollector()->histogram($metric, $begin, $end, $site_id, $buckets, $min, $max);
186:
187: }
188:
189: /**
190: * Get collector instance
191: *
192: * @return Collector
193: */
194: private function getCollector(): Collector
195: {
196: if (!isset($this->collector)) {
197: $this->collector = new Collector(\PostgreSQL::pdo());
198: }
199:
200: return $this->collector;
201: }
202:
203: /**
204: * Drop metric value from database
205: *
206: * @param string $metric metric to discard
207: * @param bool $rekey rekey attribute metadata on next run
208: * @return bool
209: */
210: public function drop_metric(string $metric, bool $rekey = false): bool
211: {
212: if (null === ($id = $this->getCollector()->metricAsId($metric))) {
213: return false;
214: }
215:
216: $db = \PostgreSQL::pdo();
217:
218: $table = $rekey ? 'metric_attributes' : 'metrics';
219: $chunker = new \Daphnie\Chunker($db);
220: $chunker->decompressRange(null);
221: $stmt = $db->prepare("DELETE FROM $table WHERE attr_id = :attr_id");
222: $ret = $stmt->execute([':attr_id' => $id]);
223: $chunker->release();
224: return $ret ?: error('Failed to drop metric %(metric)s: %(err)s',
225: ['metric' => $metric, 'err' => array_get($stmt->errorInfo(), 2, '')]
226: );
227: }
228:
229: /**
230: * Timescale chunk statistics
231: *
232: * @return array
233: */
234: public function chunks(): array
235: {
236: return (new \Daphnie\Chunker(\PostgreSQL::pdo()))->getChunkStats();
237: }
238:
239: /**
240: * Get all metric symbols
241: *
242: * @return array
243: */
244: public function metrics(bool $extended = false): array
245: {
246: $metrics = $this->getCollector()->all();
247: return $extended ? $metrics : array_keys($metrics);
248: }
249:
250: /**
251: * Get metric compression usage
252: *
253: * @return array
254: */
255: public function db_compression_usage(): array {
256: $pg = PostgreSQL::pdo();
257: $query = (new \Daphnie\Connector($pg))->vendor()->getCompressionStats();
258: $res = $pg->query($query);
259: if (!$res) {
260: return [];
261: }
262:
263: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0, []);
264:
265: foreach ($rec as $k => $v) {
266: if (substr($k, -6) === '_bytes') {
267: $rec[$k] = \Formatter::changeBytes($v);
268: }
269: }
270:
271: return (array)$rec;
272: }
273:
274: /**
275: * Get metric usage
276: *
277: * @return array
278: */
279: public function db_usage(): array
280: {
281: $pg = PostgreSQL::pdo();
282: $query = (new \Daphnie\Connector($pg))->vendor()->databaseUsage();
283: $res = $pg->query($query);
284: if (!$res) {
285: return [];
286: }
287:
288: $rec = array_get($res->fetchAll(\PDO::FETCH_ASSOC), 0);
289:
290: foreach ($rec as $k => $v) {
291: if (substr($k, -6) === '_bytes') {
292: $rec[$k] = \Formatter::changeBytes($v);
293: }
294: }
295:
296: return (array)$rec;
297: }
298:
299: /**
300: * Decompress all chunks
301: *
302: * Note: reinitialize_compression() must be called after this
303: *
304: * @return bool
305: */
306: public function decompress_all(): bool
307: {
308: $pg = PostgreSQL::pdo();
309: $chunker = new \Daphnie\Chunker($pg);
310:
311: return $chunker->decompressAll();
312: }
313:
314: /**
315: * Reinitialize suspended compression
316: *
317: * @return bool
318: */
319: public function reinitialize_compression(): bool
320: {
321: $pg = PostgreSQL::pdo();
322: $chunker = new \Daphnie\Chunker($pg);
323: foreach ($chunker->getJobs() as $job) {
324: if (!$chunker->resumeJob($job['job_id'])) {
325: return false;
326: }
327: }
328:
329: return true;
330: }
331:
332: public function collect(): void
333: {
334: $collector = $this->getCollector();
335: foreach ($collector->getAnonymousCollections() as $collection) {
336: $collection->log($collector);
337: }
338:
339: }
340:
341: /**
342: * TimescaleDB version
343: *
344: * @return string|null
345: */
346: public function version(): ?string
347: {
348: if (!TELEMETRY_ENABLED) {
349: return null;
350: }
351:
352: return (new \Daphnie\Connector(\PostgreSQL::pdo()))->getVersion();
353: }
354:
355: public function _cron(Cronus $cron)
356: {
357: $this->collect();
358: /**
359: * Prevent losing configuration settings in allkeys-lru purge
360: */
361: $cache = \Cache_Global::spawn();
362: $cache->get(CONFIGURATION_KEY);
363: \Lararia\JobDaemon::snapshot();
364: }
365: }