XRootD
Loading...
Searching...
No Matches
XrdRmcData.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d R m c D a t a . c c */
4/* */
5/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstring>
33
34#include "XrdRmc/XrdRmc.hh"
35#include "XrdRmc/XrdRmcData.hh"
37
38/******************************************************************************/
39/* C o n s t r u c t o r */
40/******************************************************************************/
41
43 long long vn, int opts)
44 : pPLock(0), rPLock(0), wPLock(0),
45 Cache(cP), ioObj(ioP), VNum(vn)
46{
47// We need to map the cache options to our local options
48//
49 isFIS = (opts & XrdOucCache::optFIS ? 1 : 0);
50 isRW = (opts & XrdOucCache::optRW ? okRW : 0);
51
52// Copy some values from the cache to our local area for convenience
53//
54 SegShft = Cache->SegShft;
55 OffMask = Cache->OffMask;
56 SegSize = Cache->SegSize;
57 maxCache = Cache->maxCache;
58 Debug = Cache->Dbg;
59
60// Initialize the pre-read area
61//
62 memset(prRR, -1, sizeof(prRR) );
63 memset(prBeg, -1, sizeof(prBeg));
64 memset(prEnd, -1, sizeof(prEnd));
65 memset(prOpt, 0, sizeof(prOpt));
66
67 prNSS =-1;
68 prRRNow = 0;
69 prStop = 0;
70 prNext = prFree = 0;
71 prActive = 0;
72 prOK = (Cache->prNum ? 1 : 0);
73 prReq.Data = this;
74 prAuto = (prOK ? setAPR(Apr, Cache->aprDefault, SegSize) : 0);
75 prPerf = 0;
76 prCalc = Apr.prRecalc;
77
78// Establish serialization options
79//
80 if (Cache->Options & XrdRmc::ioMTSafe) pPLopt = rPLopt = xs_Shared;
81 else pPLopt = rPLopt = xs_Exclusive;
82
83// Establish serialization handling (only needed for r/w files)
84//
85 if (Cache->Options & XrdRmc::Serialized)
86 {if (Cache->Options & XrdRmc::ioMTSafe)
87 {if (isRW && prOK) pPLock = wPLock = &rwLock;}
88 else if (prOK) rPLock = pPLock = wPLock = &rwLock;
89 } else if (!(Cache->Options & XrdRmc::ioMTSafe) || isRW)
90 rPLock = pPLock = wPLock = &rwLock;
91}
92
93/******************************************************************************/
94/* D e t a c h */
95/******************************************************************************/
96
98{
99 int delOK;
100
101// We must wait for any pre-reads to stop at this point. TO DO: We really
102// should run this in a sperate thread and use the callback mechanism.
103//
104 DMutex.Lock();
105 if (prActive)
106 {XrdSysSemaphore prDone(0);
107 prStop = &prDone;
108 DMutex.UnLock();
109 prDone.Wait();
110 DMutex.Lock();
111 }
112
113// Get exclusive control
114//
115 rwLock.Lock(xs_Exclusive);
116
117// We can now detach ourselves from the cache
118//
119 delOK = Cache->Detach(ioObj);
120 DMutex.UnLock();
121 rwLock.UnLock(xs_Exclusive);
122
123// Check if we should delete ourselves and if so add our stats to the cache
124//
125 if (delOK)
126 {Cache->Statistics.Add(Statistics);
127 if (Cache->Lgs)
128 {char sBuff[4096];
129 snprintf(sBuff, sizeof(sBuff),
130 "Cache: Stats: %lld Read; %lld Get; %lld Pass; "
131 "%lld Write; %lld Put; %lld Hits; %lld Miss; "
132 "%lld pead; %lld HitsPR; %lld MissPR; Path %s\n",
133 Statistics.X.BytesRead, Statistics.X.BytesGet,
134 Statistics.X.BytesPass, Statistics.X.BytesWrite,
135 Statistics.X.BytesPut,
136 Statistics.X.Hits, Statistics.X.Miss,
137 Statistics.X.BytesPead,
138 Statistics.X.HitsPR, Statistics.X.MissPR,
139 ioObj->Path());
140 std::cerr <<sBuff;
141 }
142 delete this;
143 return true;
144 }
145// TO DO: We should issue a message here as this will cause a memory leak
146// as we won't try to do the detavh again.
147//
148 return false;
149}
150
151/******************************************************************************/
152/* P r e r e a d */
153/******************************************************************************/
154
156{
157 MrSw EnforceMrSw(pPLock, pPLopt);
158 long long segBeg, segEnd;
159 int oVal, pVal = 0, rLen, noIO, bPead = 0, prPages = 0;
160 char *cBuff;
161
162// Check if we are stopping, if so, ignore this request
163//
164 DMutex.Lock();
165 if (prStop)
166 {prActive = 0;
167 prStop->Post();
168 DMutex.UnLock();
169 return;
170 }
171
172// Do the next pre-read in the queue (it's possible another may get in)
173//
174do{if ((oVal = prOpt[prNext]))
175 {segBeg = prBeg[prNext]; segEnd = prEnd[prNext];
176 prOpt[prNext++] = 0;
177 if (prNext >= prMax) prNext = 0;
178 if (oVal == prSKIP) continue;
179 prActive = prRun;
180 if (Debug > 1) std::cerr <<"prD: beg " <<(VNum >>XrdRmcReal::Shift) <<' '
181 <<(segEnd-segBeg+1)*SegSize <<'@' <<(segBeg*SegSize)
182 <<" f=" <<int(oVal) <<' ' <<ioObj->Path() <<std::endl;
183 DMutex.UnLock();
184 oVal = (oVal == prSUSE ? XrdRmcSlot::isSUSE : 0) | XrdRmcSlot::isNew;
185 segBeg |= VNum; segEnd |= VNum;
186 do {if ((cBuff = Cache->Get(ioObj, segBeg, rLen, noIO)))
187 {if (noIO) pVal = 0;
188 else {pVal = oVal; bPead += rLen; prPages++;}
189 }
190 } while(cBuff && Cache->Ref(cBuff, 0, pVal) && segBeg++ < segEnd);
191 if (Debug > 1) std::cerr <<"PrD: end " <<(VNum >>XrdRmcReal::Shift)
192 <<' ' <<prPages <<" pgs " <<bPead <<std::endl;
193 if (bPead)
194 {Statistics.Lock();
195 Statistics.X.BytesPead += bPead;
196 Statistics.X.MissPR += prPages;
197 Statistics.UnLock();
198 }
199 DMutex.Lock();
200 }
201 } while(oVal);
202
203// See if we should schedule the next preread or stop
204//
205 if (prStop)
206 {prActive = 0;
207 prStop->Post();
208 } else if (prOpt[prNext])
209 {prActive = prWait;
210 Cache->PreRead(&prReq);
211 } else prActive = 0;
212
213// All done here
214//
215 DMutex.UnLock();
216}
217
218/******************************************************************************/
219
220void XrdRmcData::Preread(long long Offs, int rLen, int Opts)
221{
222 int How;
223
224// Determine how to place the pages. We do this via assignment to avoid a gcc
225// bug that doesn't optimize out static const int's except via assignment.
226//
227 if (Opts & SingleUse) How = prSUSE;
228 else How = prLRU;
229
230// Verify that this preread will succeed then schedule it if so
231//
232 if (prOK && rLen > 0 && Offs > 0
233 && Offs < XrdRmcReal::MaxFO && (Offs + rLen) < XrdRmcReal::MaxFO) return;
234 QueuePR(Offs>>SegShft, rLen, How);
235}
236
237/******************************************************************************/
238
240{
241
242// Establish the new feature set if prereads are enabled
243//
244 if (prOK)
245 {DMutex.Lock();
246 prAuto = setAPR(Apr, Parms, SegSize);
247 DMutex.UnLock();
248 }
249}
250
251/******************************************************************************/
252/* Q u e u e P R */
253/******************************************************************************/
254
255void XrdRmcData::QueuePR(long long segBeg, int rLen, int prHow, int isAuto)
256{
257 XrdSysMutexHelper Monitor(&DMutex);
258 long long segCnt, segEnd;
259 int i;
260
261// Scuttle everything if we are stopping
262//
263 if (Debug) std::cerr <<"prQ: req " <<rLen <<'@' <<(segBeg*SegSize) <<std::endl;
264 if (prStop) return;
265
266// Verify that this offset is not in the table of recent offsets
267//
268 for (i = 0; i < prRRMax; i++) if (prRR[i] == segBeg) return;
269
270// Compute number of pages to preread. If none, we are done.
271//
272 segCnt = rLen/SegSize + ((rLen & OffMask) != 0);
273 if (prHow == prLRU)
274 {if (segCnt < Apr.minPages) segCnt = Apr.minPages;
275 if (!segCnt) return;
276 }
277
278// Compute last segment to read
279//
280 segEnd = segBeg + segCnt - 1;
281
282// Run through the preread queue and check if we have this block scheduled or
283// we completed the block in the recent past. We do not catch overlapping
284// prereads, they will need to go through the standard fault mechanism).
285//
286 for (i = 0; i < prMax; i++)
287 if (segBeg == prBeg[i] || (segBeg > prBeg[i] && segEnd <= prEnd[i]))
288 {if (prHow == prSKIP)
289 {if (Debug) std::cerr <<"pDQ: " <<rLen <<'@' <<(segBeg*SegSize) <<std::endl;
290 prOpt[i] = prSKIP;
291 }
292 return;
293 }
294
295// Return if this is a cancellation request
296//
297 if (prHow == prSKIP) return;
298
299// At this point check if we need to recalculate stats
300//
301 if (prAuto && prCalc && Statistics.X.BytesPead > prCalc)
302 {int crPerf;
303 Statistics.Lock();
304 prCalc = Statistics.X.BytesPead + Apr.prRecalc;
305 crPerf = (Statistics.X.MissPR?
306 (Statistics.X.HitsPR*100)/Statistics.X.MissPR : 0);
307 Statistics.UnLock();
308 if (Debug) std::cerr <<"PrD: perf " <<crPerf <<"% " <<ioObj->Path() <<std::endl;
309 if (prPerf >= 0)
310 {if ( crPerf < Apr.minPerf && prPerf < Apr.minPerf
311 && (crPerf <= prPerf || crPerf <= prPerf*2))
312 {if (Debug) std::cerr <<"PrD: Disabled for " <<ioObj->Path() <<std::endl;
313 prAuto = 0;
314 if (isAuto) return;
315 }
316 }
317 prPerf = crPerf;
318 }
319
320// Add this read to the queue
321//
322 if (prFree == prNext && prOpt[prNext]) prNext = (prNext+1)%prMax;
323 prBeg[prFree] = segBeg; prEnd[prFree] = segEnd;
324 prOpt[prFree++] = prHow;
325 if (prFree >= prMax) prFree = 0;
326
327// If nothing pending then activate a preread
328//
329 if (Debug) std::cerr <<"prQ: add " <<rLen <<'@' <<(segBeg*SegSize) <<std::endl;
330 if (!prActive) {prActive = prWait; Cache->PreRead(&prReq);}
331}
332
333/******************************************************************************/
334/* R e a d */
335/******************************************************************************/
336
337int XrdRmcData::Read(char *Buff, long long Offs, int rLen)
338{
339 MrSw EnforceMrSw(rPLock, rPLopt);
341 char *cBuff, *Dest = Buff;
342 long long segOff, segNum = (Offs >> SegShft);
343 int noIO, rAmt, rGot, doPR = prAuto, rLeft = rLen;
344
345// Verify read length and offset
346//
347 if (rLen <= 0) return 0;
348 if (XrdRmcReal::MaxFO < Offs || Offs < 0
349 || XrdRmcReal::MaxFO < (Offs + rLen)) return -EOVERFLOW;
350
351// Check for preread request and Determine how to place the pages.
352//
353 if (!Buff)
354 {int How;
355 if (rLen > maxCache) How = prSUSE;
356 else How = prLRU;
357 QueuePR(segNum, rLen, How);
358 return 0;
359 }
360
361// Ignore caching it if it's too large. Use alternate read algorithm.
362//
363 if (rLen > maxCache) return Read(Now, Buff, Offs, rLen);
364
365// We check now whether or not we will try to do a preread later. This is
366// advisory at this point so we don't need to obtain any locks to do this.
367//
368 if (doPR)
369 {if (rLen >= Apr.Trigger) doPR = 0;
370 else for (noIO = 0; noIO < prRRMax; noIO++)
371 if (prRR[noIO] == segNum) {doPR = 0; break;}
372 if (doPR)
373 {DMutex.Lock();
374 prRR[prRRNow] = segNum;
375 prRRNow = (prRRNow+1)%prRRMax;
376 DMutex.UnLock();
377 }
378 }
379 if (Debug > 1) std::cerr <<"Rdr: " <<rLen <<'@' <<Offs <<" pr=" <<doPR <<std::endl;
380
381// Get the segment pointer, offset and the initial read amount
382//
383 segNum|= VNum;
384 segOff = Offs & OffMask;
385 rAmt = SegSize - segOff;
386 if (rAmt > rLen) rAmt = rLen;
387
388// Now fault the pages in
389//
390 while((cBuff = Cache->Get(ioObj, segNum, rGot, noIO)))
391 {if (rGot <= segOff + rAmt) rAmt = (rGot <= segOff ? 0 : rGot-segOff);
392 if (rAmt) {memcpy(Dest, cBuff+segOff, rAmt);
393 Dest += rAmt; Offs += rAmt; Now.X.BytesGet += rGot;
394 }
395 if (noIO) {Now.X.Hits++; if (noIO < 0) Now.X.HitsPR++;}
396 else {Now.X.Miss++; Now.X.BytesRead += rAmt;}
397 if (!(Cache->Ref(cBuff, (isFIS ? rAmt : 0)))) {doPR = 0; break;}
398 segNum++; segOff = 0;
399 if ((rLeft -= rAmt) <= 0) break;
400 rAmt = (rLeft <= SegSize ? rLeft : SegSize);
401 }
402
403// Update stats
404//
405 Statistics.Add(Now);
406
407// See if a preread needs to be done. We will only do this if no errors occurred
408//
409 if (doPR && cBuff)
410 {EnforceMrSw.UnLock();
411 QueuePR(segNum, rLen, prLRU, 1);
412 }
413
414// All done, if we ended fine, return amount read. If there is no page buffer
415// then the cache returned the error in the amount present variable.
416//
417 if (Debug > 1) std::cerr <<"Rdr: ret " <<(cBuff ? Dest-Buff : rGot) <<" hits "
418 <<Now.X.Hits <<" pr " <<Now.X.HitsPR <<std::endl;
419 return (cBuff ? Dest-Buff : rGot);
420}
421
422/******************************************************************************/
423
425 char *Buff, long long Offs, int rLen)
426{
427 char *cBuff, *Dest = Buff;
428 long long segOff, segNum;
429 int noIO, rAmt, rGot, rIO, rPend = 0, rLeft = rLen;
430
431// Get the segment pointer, offset and the initial read amount
432//
433 segNum = (Offs >> SegShft) | VNum;
434 segOff = Offs & OffMask;
435 rAmt = SegSize - segOff;
436 if (rAmt > rLen) rAmt = rLen;
437 if (Debug > 1) std::cerr <<"Rdr: " <<rLen <<'@' <<Offs <<" pr=" <<prOK <<std::endl;
438
439// Optimize here when this is R/O and prereads are disabled. Otherwise, cancel
440// any pre-read for this read as we will be bypassing the cache.
441//
442 if (!isRW && !prOK)
443 {if ((rIO = ioObj->Read(Dest, Offs, rLen)) > 0)
444 Statistics.Add(Statistics.X.BytesPass, rLen);
445 return rIO;
446 } else if (prOK) QueuePR(Offs >> SegShft, rLen, prSKIP);
447
448// Here we try to get as much data from the cache but otherwise we will
449// issue the longest read possible.
450//
451do{if ((cBuff = Cache->Get(0, segNum, rGot, noIO)))
452 {if (rPend)
453 {if ((rIO = ioObj->Read(Dest, Offs, rPend)) < 0) return rIO;
454 Now.X.BytesPass += rIO; Dest += rIO; Offs += rIO; rPend = 0;
455 }
456 if (rGot <= segOff + rAmt) rAmt = (rGot <= segOff ? 0 : rGot-segOff);
457 if (rAmt) {memcpy(Dest, cBuff+segOff, rAmt);
458 Dest += rAmt; Offs += rAmt; Now.X.Hits++;
459 Now.X.BytesGet += rAmt;
460 }
461 if (noIO < 0) Now.X.HitsPR++;
462 if (!(Cache->Ref(cBuff, (isFIS ? rAmt : 0)))) break;
463 } else rPend += rAmt;
464
465 if ((rLeft -= rAmt) <= 0) break;
466 rAmt = (rLeft <= SegSize ? rLeft : SegSize);
467 segNum++; segOff = 0;
468 } while(1);
469
470// Finish any outstanding read
471//
472 if (rPend)
473 {if ((rIO = ioObj->Read(Dest, Offs, rPend)) < 0) return rIO;
474 Now.X.BytesPass += rIO; Dest += rIO;
475 }
476
477// Update stats and return read length
478//
479 if (Debug > 1) std::cerr <<"Rdr: ret " <<(Dest-Buff) <<" hits " <<Now.X.Hits
480 <<" pr " <<Now.X.HitsPR <<std::endl;
481 Statistics.Add(Now);
482 return Dest-Buff;
483}
484
485/******************************************************************************/
486/* s e t A P R */
487/******************************************************************************/
488
489int XrdRmcData::setAPR(aprParms &Dest, aprParms &Src, int pSize)
490{
491
492// Copy over the information
493//
494 Dest = Src;
495
496// Fix it up as needed
497//
498 if (Dest.Trigger <= 0) Dest.Trigger = (Dest.minPages ? pSize + 1: 0);
499 if (Dest.prRecalc <= 0) Dest.prRecalc = (Dest.prRecalc ? 52428800 : 0);
500 if (Dest.minPages < 0) Dest.minPages = 0;
501 if (Dest.minPerf < 0) Dest.minPerf = 0;
502 if (Dest.minPerf >100) Dest.minPerf = 100;
503
504
505// Indicate whether anything can be preread
506//
507 return (Dest.minPages > 0 && Dest.Trigger > 1);
508}
509
510/******************************************************************************/
511/* T r u n c */
512/******************************************************************************/
513
514int XrdRmcData::Trunc(long long Offs)
515{
516 MrSw EnforceMrSw(wPLock, xs_Exclusive);
517
518// Verify that we can modify this cache and the trunc offset
519//
520 if (!isRW) return -EROFS;
521 if (Offs > XrdRmcReal::MaxFO || Offs < 0) return -EOVERFLOW;
522
523// Get the segment pointer and truncate pages from the cache
524//
525 Cache->Trunc(ioObj, (Offs >> SegShft) | VNum);
526
527// Now just return the result of actually doing the truncate
528//
529 return ioObj->Trunc(Offs);
530}
531
532/******************************************************************************/
533/* W r i t e */
534/******************************************************************************/
535
536int XrdRmcData::Write(char *Buff, long long Offs, int wLen)
537{
538 MrSw EnforceMrSw(wPLock, xs_Exclusive);
540 char *cBuff, *Src = Buff;
541 long long segOff, segNum;
542 int noIO, wAmt, rGot, wLeft = wLen;
543
544// Verify write length, ability, and buffer
545//
546 if (wLen <= 0) return 0;
547 if (!isRW) return -EROFS;
548 if (!Buff) return -EINVAL;
549 if (XrdRmcReal::MaxFO < Offs || Offs < 0
550 || XrdRmcReal::MaxFO < (Offs + wLen)) return -EOVERFLOW;
551
552// First step is to write out all the data (write-through for now)
553//
554 if ((wAmt = ioObj->Write(Buff, Offs, wLen)) != wLen)
555 return (wAmt < 0 ? wAmt : -EIO);
556 Now.X.BytesWrite = wLen;
557
558// Get the segment pointer, offset and the initial write amount
559//
560 segNum = (Offs >> SegShft) | VNum;
561 segOff = Offs & OffMask;
562 wAmt = SegSize - segOff;
563 if (wAmt > wLen) wAmt = wLen;
564
565// Now update any pages that are actually in the cache
566//
567do{if (!(cBuff = Cache->Get(0, segNum, rGot, noIO))) Now.X.Miss++;
568 else {memcpy(cBuff+segOff, Src, wAmt);
569 Now.X.BytesPut += wAmt; Now.X.Hits++;
570 if (noIO < 0) Now.X.HitsPR++;
571 Cache->Upd(cBuff, wAmt, segOff);
572 }
573 Src += wAmt;
574 if ((wLeft -= wAmt) <= 0) break;
575 wAmt = (wLeft <= SegSize ? wLeft : SegSize);
576 segNum++; segOff = 0;
577 } while(1);
578
579// Update stats and return how much we wrote
580//
581 Statistics.Add(Now);
582 return wLen;
583}
struct myOpts opts
@ xs_Exclusive
@ xs_Shared
virtual int Read(char *buff, long long offs, int rlen)=0
virtual int Write(char *buff, long long offs, int wlen)=0
static const int SingleUse
Mark pages for single use.
virtual const char * Path()=0
virtual int Trunc(long long offs)=0
void Add(XrdOucCacheStats &S)
struct XrdOucCacheStats::CacheStats X
static const int optRW
File is read/write (o/w read/only)
XrdOucCacheStats Statistics
static const int optFIS
File is structured (e.g. root file)
int Read(char *Buffer, long long Offset, int Length)
void Preread()
int Trunc(long long Offset)
XrdRmcData(XrdRmcReal *cP, XrdOucCacheIO *ioP, long long vn, int opts)
Definition XrdRmcData.cc:42
int Write(char *Buffer, long long Offset, int Length)
bool Detach(XrdOucCacheIOCD &iocd)
Definition XrdRmcData.cc:97
static int setAPR(aprParms &Dest, aprParms &Src, int pSize)
void PreRead()
static const int isNew
static const int isSUSE
static const int Serialized
Caller ensures MRSW semantics.
Definition XrdRmc.hh:130
static const int ioMTSafe
CacheIO object is MT-safe.
Definition XrdRmc.hh:133
void Lock(const XrdSysXS_Type usage)
void UnLock(const XrdSysXS_Type usage=xs_None)