From 189c5b588cf3b908b334c8f807c9a1d56df0c2bd Mon Sep 17 00:00:00 2001 From: Owen Nelson Date: Thu, 26 Oct 2023 22:25:29 -0700 Subject: [PATCH] WIP delay support - SQS, Mem, and Rabbit work, Redis is _rough_. --- _rabbit/enabled_plugins | 1 + ...abbitmq_delayed_message_exchange-3.11.1.ez | Bin 0 -> 43830 bytes omniqueue/Cargo.toml | 3 +- omniqueue/src/backends/memory_queue.rs | 12 +- omniqueue/src/backends/rabbitmq.rs | 38 ++- omniqueue/src/backends/redis/mod.rs | 294 +++++++++++++++++- omniqueue/src/backends/sqs.rs | 2 +- omniqueue/src/scheduled/mod.rs | 2 +- omniqueue/tests/rabbitmq.rs | 60 +++- omniqueue/tests/redis.rs | 24 ++ omniqueue/tests/redis_cluster.rs | 24 ++ omniqueue/tests/sqs.rs | 21 ++ testing-docker-compose.yml | 7 +- 13 files changed, 466 insertions(+), 22 deletions(-) create mode 100644 _rabbit/enabled_plugins create mode 100644 _rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez diff --git a/_rabbit/enabled_plugins b/_rabbit/enabled_plugins new file mode 100644 index 0000000..0dfabd2 --- /dev/null +++ b/_rabbit/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management, rabbitmq_delayed_message_exchange]. \ No newline at end of file diff --git a/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez b/_rabbit/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez new file mode 100644 index 0000000000000000000000000000000000000000..fb7530f361d85e40c4a30497a2469d8e442fd4a2 GIT binary patch literal 43830 zcmb@tbBr%c6eZYpzvg>w+qP}nw(b74ZQHhO+qP}H_uJV`CfQ_nGRam_m8yU4$w?|F zb?e?EF9iw)1N486Ef+bJ|FQYM7YZP3AV&j3Lknjc2R&mGYXc7xV?7%aCnp0l6Fn1m zBXa{=GZPvnS_TGM209gGXduwsSV^P2|E7yOED$i*GcXVk80vq)%9|D;W8XkuXV|3x*}6Pi0|`|~xbehRM8 z;7Vhphp7Q&BxS58^h(GGBLhP#1yw;&Awi-+7)pd(A{eo_C@v|CJi(Yk!gH3Mr#EN! z_q+G@E$g#q#---_BgZNBld1Cxdl?^*CoH_ZKAK@!1QNu61Js|t%-fg`03ci@i$gp? z6N7q!CX*-k?FSFnui6|HQ%TY{H`SM*RPje zkez-|kW@Hi*d~%NIDybmoY#23pB&_v?k`MoG-^G?V<=<=a=$1^2nu0AnKeE(AvwMy z6>6LdK2b$=7NRl;N0vAU?K&ubxp&LL9?WaxBbl|yDT**LC_qaxli4s5Pclp>7-euE z3|}Bl8b3H2Su7uUz~`(ce*&M9QXTj{kRFQKKo}+&1ok!Zh$z8NEs&reB?vEIWjK*#;v7cdO}#BF>t87gm7R~R2@Gs&n^mr z%CFxlczaA#5k*y{6JvElYU|A^A9=9#P-G;LKlpV_DTDn;eZnQ=KY;2VGLmw>CRygS2-C5~ea!Q7%0g>?gNMq}YZ~afiF#z|^IlPs+u&}u@@d;j~FfzLtw0|EX zm?2CHA6Nh?gc%cN!?OjD5gvMiJTv_C5Dl=u;k|T%yDeOf1dbiB|9oAGI~1zeAcI3D zf1Z06G{h`?;l541c^S`iPWUDs`;p(MkXI{;l2SuUlh_H*N)Zcbd2808fMia8w= zzWhXtiO4>)V|+?I>E8zirt~y}GH@1d3v{NmpGi1en?FVY=}d$A_DwvP86uS?2cnJwc_P?|P(q#0cHpc{ zbb;Sd48ueW^tMnCno?Mh72M|LR{Qu>f5#(12o@e${PPX)Mf>~--hWQ)+8nc-((Ra*-4?P=jrF(Dob``^|Ml-;l!AriV4jA2Fo(~?CQnDC5k!?$R)CYu=Ixt zFi$MpkEZ3r{!Rj{+DPP%6j6Ags74_Uh5mTBBB*7-h>&1wpoD@`=1VmIWAx>q$AWuy z2uLdE6L01>SHvv#e=w(N!q9!v>~rd#9{1?_f@!uOZAJYe{Gy7%2F}QlNhf`V=Nbm? zKTV=hJD4b@h@ebBTGhys=p#X)26x8SyQ9|QC@qc&0kaTLE5u^_6wEamuS@r*m zSkh0@x`23`U)tZ84)OAv6EfDjfM~PlKQL@=PZ1Y_Ee%29x^n3K2Z{gY`Pg{}4=WA&*MJt%y-IS)V^D@Pdx1%^MHi>F3|ik4&we z1ml;+j2pgb*#ZY&1B6`$JlhaVj?wXSoMl6fAk5t@gkZ$rQigDEP9BhF-Sp#7KpW=&Ck)JlBeuRf-`OP>m)&G5Vn z^BE^5e^gCjn4+`jLCkxAOtHZ2b}lW$TveDGL`8iT_lk z?+1utu{oa}k$_HKr0<=+IDuaqv#0`gwEtZoTE)ViC(j57Gw87Ifb}(r7y-Vf_c$vZ z9;79LXPbsSBh5p9OMOo~8e$hZDax-92OE(Z-nPyofPs|(H-vd73c+~+F)SU3p)PD& zTRC9fp;RIe%m1%ZT|uyrh0)pkKnp7qE~=nCmDRtGb;w{Vt3qZ5t@pXTk<7^{tx$Yz zsF)Bt$i3zKei=U=SpEw}d+ET;0NE>Me}OZ|e~pBa2!UYwEK>UZ#7u*8>L25={aUEN zsV992cdE%2cRGFkP_UTCFxKb!^@vWU9qKfQjebV^1^Bo=nBHvmrtn~{1^7S%h@3$L zD(2{XbFE+>h8^TYX}IX>9bP_g6Hv;t|4hP~gfRH`A@a~NV}pA&E!sR-8bHh;_3J=R z`VXOOwZjH}*U8j7P&#l+U|>-V@KbqO`u?KY3I=L{f7k ztbq-#K~&A{A?qYa^6X(T!RlTE46h(IA%uycgSBk${mTA!C;CMPO9||I`MLDHg6Xub z2(0&=iBX1(qKopi;|Q!5^#|Ib6g_>ppUDsR-HF@56BhB)z@)O*R z(c@m5hWr2K*`oo`C=W{mq0Ayafwk3cLjgxzua=)O+FidARJ~^J3^ZTzv1*e`EMS2NJ~(MEIfe z17kS*)<^uU@L&&&0XqZ|B87uv3Grb>M}^3hm0Em1sdxV3&%oFE-knr$HJn2ft$Bdk zMAjc;xH9OEGoIV*M>L!J(uZO?Vu)htkfEgyLT5eiaYy(5GqRDJgOARPqg4gRRTZVl zkM2eaG2KKJAsoA#L4ek;{)@sDH7)A_wIzh^WELU^6?i*aXENHxH2=OLOc@u*=|{ov zVt9a&Ip>w& z;0}KI^ZaGED!fw}H_M*TULgUarLZVnJD({VvSM3|msw z@yC$$-l@`xdUw{1GWIQ}S`Z%3;>3l=Sm$XRi78oel2GzQ=1SwtWAZ(0B%%Y`M!JB5 z$)eiwyuzR=aWjVMoLx_`x?4J&)$_jW9Wi?P!UMB;b|BpDd-(1AIALmtq5FM>;eA$` zt~J=S$BjD*?~t$g<@pBv);>q{zYp8((^g3z|5~Tm?L*4efRQsKo|id^^9@;(9TfTS5Fg@90E#bpA>U zFqqxLVmdm*`_M4#&vU_Wbmn?u8V?e%@-qN?pVctvtApl+^9l|(j`)74sOz*N5(|f3;3Y08N>XvD(vfR^spo~ZVUtLbPLbv zJu5)R|0cZxJldu7%<^j+A(6>}jSY#vv(?FkKW3?rsD*GR<3??}tA}lS*04&cyKe?_gkob2}>7ik>C*d8XVctvGA{=kDa}#tj@WDIP2qAU7H%p zVUFtp+=ukG?QHuqvt$w%Mw~|2PAG|jIF7hSbmTd>G6QEn-8hP3$Fxmc72}Xe#t7y= zh;)^U#6bRSRFUe0Gr;|_<%+J_U43-~sqqC+=h44b|yWGp0& zUxBV+A80Hy`PZ==D^auQGbvX)soZ(5ylx8Yj#!gj_iq>+JSix>Ih<~aJeIG;Doe}< zVn^1VDoYLHTsWv}_T$=F!|%w-X%c0ykWP~f(#H%pYbh{jEopmL$Q!p#{}8BV)6+>e z4>}=d-cyf~vr@Q=l2sga4b#VuC}%a=!6{D8P=b2Shp=q;GHuz%*|1`3lR4xjbIO(? zKlg{ekHj<-rQ<7iuLX-sQ^9$qk7I>9yPySV$?)O5St*@~2`{2KL5IscY7rl^oHmkP zk6(5t2>1#MqNY-D6yZSwbV=6CYjnkB?a{mEtp>5UQRbA)Ny#J;SxPmYZq!zN52OeY zDt6|1JWAQ5zNZS^FxBLcBV`+4HVxmdoz|o}VRpJH5`^|QKr%K*re=2Be7%l^zbE!z zeYlzGZ%f?|4)h^X2cBW*28v9!_y_p9pl@w)qNG z_aK+9eI7hU?-|FiQp-?Ey(I-yg}S*Ekxk}4)21oNLi8e)lgafqRL$nK`D`dS}3 z$I^^u?M~-y)c2v;$T=-G4uuRD#8&6jgfW^4Z50>a^SEZ(oU@xd}Cm9NnN{JYRY?MF!2;P$??iD%g8x~nPiW` zyYrCTyl8<-JUq-3_dSy7ZPBZhpaiA)vvgIdr9?m$mzLNOBFg)L|YS&OyMbOKb z1wYl6Rf-_>EZZ~I4x%g_P&62up`h6w-FG7_4pK2OjuA^Y;2eEmpl-Ym56FD}Gr`#* zfx1F>X1Qk9*#2tC>)!c@xuEBm2TKL;6gxVgQQD0ECzpSpHR97P0(-Fr_Hwy6 zTapU(obIYS?P1C3v~CtknQl{yo&t;8W|-DNyGyRf{Uj@vFk63Ib|=)oJwEHQ^C8j` z-;!5kkf59G-BA+|5%l2BUkq~Q5dx5OMRD{Z;9P{7(VASmMH#F|=_JpwZ-0IFrkh`m zbw+X7@&OG!8ywjImQR^=+)^&6kLd<(;tADE7dK zqs`iVvs`C7U21)fl%hR5alguwiXUNaiW2Z8d#%^axV__M-I7EzS>cVH2A6H^wJx>X zwb~*L8hOVeB#tchIoW=a%(0`DI&xqGx(d3J%nEMgRO4&b?vf;LGCt3%2vkB z$;GF)V~*J8rFO)CdpVnt8kdJ~`iBalZ>)p=_UyoOTFYpWi#!SeA35d^_O~jJxmo4< zp{41U+0eEdZ2B(pFJ*Q;v5kG6f#fqt^*Jb2`nRL_$0?h=Q**cB2v&J_!vYbvaD&ms z=fb-cNvF*V16T5C0TXk>u{7~g4kctJ#s{|kLWaVO{XRh(d*+8O0f%yGbBL@>Vezc5 zO+1$8b^Qa_Ws=PmdvAfP-%fbGk6EQMK*M$`Ifln;@h&u^U(EqDv}5KRgZDXS6vKVR z3)bL3A(kgXuN8kVg?%f4gJPj%Y%-tS%dp%eJh}MMpfRN7{9ZoCs|r?b zg-et=+DNiW0`bvw%zXd81#;1J-G*-V+Na5DMY`_I(X>+AzkRR}@^yiQFOKozsogGi zd8s4oAxW>+KPvp9cOcpK-5 z@iV{a)1_Fx#?iXm)+b@?QBI)1R2{A5bFQ`H%5C#h0k)c5(tdDV4WXd&b&p2}Zj(tA zgBs0^uL1zDdIwGej26irE=E9d4&d3J3D03X3GHBysVZ((_e zH!ml#bXHhY_z&t;J*seutzXv*=covceJS}2FdQ<{RTf4{U?>w^jVHU*B949thI6zm zePDHk?zMz-Og6zyHUmYoj#&l0#^)!rF~{JbbkBKNmY)6D-x}K4g^d zwA$!p!tp8XeiE#r9f83Q43s9ja!~McZqqSf=;N7h59S)%c$aY&%IW?@W^V;-SE;{y z#*E)WEcE2s{}U=vH5n*IKsY`nRd9Vyjo5Ql*Em=R0gBw8yBOeRIHYoMEH9rUXsk=~ z2!5QmC|w$#T|b0HmbV_IxjR$0cRBKDU$U1YzU zb;A2xpTXn^uj)P0@P%%t=c>72_QkDW+Bi}Ay6V_+^5}6vorHi#vvC=j-q(1)AJKzOLgoiF0c@^<1Cf*g90^->SrXv;;kiA3sRdYUMw|N64q*8Nz|pE#C!v$@cU+7Z!{vO=BX&=TEE97JR~YvHRH zQ{`-3)Y0?EcTcLLmbG5lp>{I^-SdTl`X_lTb~KU>6EdUB#bsLN0GF4QeVbl|Ip83S z7%eA;#G!rs#uo!?p2_|h6Ch^kJ<*e3xr;+l+R%c>uJQv@dwcCsy4)bV&f^SZ`Dp-ts1PrnF6tUq?U!SSPYGax5tD*UhKE2z(s5zSL} zq_$7{d6Rxt&6n_N9W&|Ck98g({tK8bC&EJMoz&+Pr+r((qQ9kr|30U1d=;VwH!10qO_jFB!KWs}@J74!+QBJikO;FIy1;$oUa=?>%Qj&V&Uq_a{_Kd?COVcF zQ=9`mkvO7=BIMIC?`Ea)vEKL^8C20ApBuw{({_f9iU02_oqqlXyCTET1ZqtD=)d2X zFP9(igLO-In?j>c?mtDo^lu{`P1lhD&HLE8Q!*h)iLd3ibP&=H9> zD?&)H9q*b&uG+$i8kp41r=N^gskO?bS>x`Sa$7W#m?u!`Y#HfRCS*pLZJM@_o%?lN z_v}lma+v3m8{98vTC?vhiSrUTKy6_yBw7&8sZA%Y3)$?_^fxMubn1uxuAOVq{h+Kv ze7C%Ak;%+Hgx*tgW3-LWe8%YIR}A`$#fo7LT{zF`%*on}9T&TqV^xnP8)kA#uOLAU z0WGKTMm4#|tP=yE-1#SURC}kFdhk8(hBSh@box%W$G6+J`u)6blH4peHSn9RP?Zax zP5E`Km)UmHf@l=AQ1B*LN&gPA!Jg?a7(ma*7Wkbr5ni<7*q-t| zYfc7OO0A|bHEKCPGqz`Y-@5X~5X>v#Q*)rFZ@|Fh*Epj`E!VwySB~yC^c{yQumfy7`Znm%(dU86)p_N=?>+_GjL*+lC&$IviG zW_R~Z>GCm>zpifLDvZzA+n!+NW!&RsXWXq^;D%1>{7aH}BjhEDhjYu}Ecn&R*kT`g zF^y&x%6B>~H;OrjJx7THSy34J3Rr58c0#S0mf>&Qc{`gE$hJ zAkmB+kLq?~b$#z|&8BJaq!}MF z?aT&Ze3PcFukSkkD{LxF!mpOJ*H!J>oXjpQJ?lQ*6Q&isp~U%xG`6W9(=NS&HdD<_ zFp8MEolh6(CdE@rq6MqA4l1XG-b6B0%vW=*M$r&xcFf-?XB?!P&jpS>d`&l^2K`lK zR_Dm&(sCe!TdRkPf!}kzrToq}cdnV_runjF^0#vh57V^Kby86`;dxCZGE-MF> z$d_u^dQiAXj&xMbPSm&m77OHb6P+Z9K15&oUtKHyyUuvHhn*yWT{%cJ)!uvlaZ=or z&q@u&QB8y&lXYKViVn^rttBx5fA0So_BbsMic3dbp?FU{#+98yGYS=|cnDk&fK+A7 z;;PFwXW9Vv9ApXD)_m4^D*EVZYmv2GPmv%ThTz6!hMB4GMVEyXqxv4&e3G z!eSDQEN;rz86L)Y*Q$i~d#WUreR1VH*X=bvYj948*ZyR2ycTNNP!+n+J>SS%RUi)3 zsuil;!QjXciY<(bEjr%bcDjE*5%>rNl{^r9Q++|Xo|5`RyOEZEti2^U*`28Nma;PD zgNl>O=w*UAapiO3DqU= z-C_Pcz*}=ZP)Hehjp5t^|4H+0NB3pgMpY^uNNc(79vzY~yA@?4t?03Fe=_}w4wuiZ zT|WyCV|GsW`% ztF)JguABJN^)x(gJ8^U|d(Tncx8Y`L^p7{h2|+u`ri_&hC6x*i9<8UB2I*}6@i8M- zf!Vg5qS1BtW;p+5ekdY(CvX;$`*Y$n(CH64#}#9`%mgDkaRe{b8cW1uFFQOBN1|Jn zP2Z608yTuwpW$-s?U4g(m8dkL?7q#s3~hb@i3#8QQJLsC8@xHs&y7&;<1)^1Lx?-m zr5n$U6en6I)pfI_+OKTId~Mx+5m-wf9mK98btmO*1IZIl!^izKn24>;t8o;$Hc_G9 z^nlX6=S9DPO3xBfZ=Fhg0@}(alv(MkgU|OgaOX?SIomDQ`Z#X-1QUDj(Om9)SUVRc z1)p-wwJvLL_^J;>xN&-mx6C$hdi?>4xOH&+{g0i&WYg~4y>T<0yxR^Wi-}tCYXj!N za-xBTd^8SwN#b?&z<#aP`aj+QjtVjnmMM6*Oso?L>J}G?u;h?%^~aw(3ni|uO7)YZ z6f^l4e=&d8-?n<$JRauf3`6kC>7?)lRx1y(Va{ zQ!=qfvptS%+vpFTtW|)C+MmPtjKx{ZUV8=J(NbM0ZXL~R_l^P`3pB>M>x(bhM3hq5 zPu3I+G-&f2L3881&5uBRvD1%S$rBNp=UY6VdS+7wbz1+MY_@~8?)=s2aN=(14l{H}e-+@_F`iOOQLF5xKsfEsne zVsFL;wK`r;Yg^*s>9AFg;$P2f>}CVjYy`se91aEekC+_Yuld;%SB}rP_4Pr+xip_f z0>P^48@^m(=$|XDPr@eIl3Qn*aJ7ANAv{ORz2~RFKoalpZaGj911Xo=5@RjUTh5+? z2`Uxx$4u`m8aD(VaWz4^)uf8y(InjQwf*t^5$|zo+G?osIR)A_$pJP>rq}!9*}_Es z`&}1ty(acaSajv#;;)3mPFuYtZx@FfY4FL+iBnVLRQp^1>tMP#sguEXCZVK4)u^#m>ixa_CB`G6qiCZB*qdtawC>x)Z*#57 zqcAj&kl=F6#Q1xwmB$~-)T1TXMxlUXHu3y*g5>Mcjq+(gDfP|YqEqi1xwXZXc|C;P zWu%ah>Lyk#O{%k7eyJZI^`$^y@ZT!1b@{W(GmJZO<`rFYPCu$cL;{1=?*(Bg%LkRB zYg$`egOQffr6BhC%1R6Kk{N1ezjVbcy&m!bS8`eN*15OR%~k6#`t-H)GXJr5w3Kk; zRI!5`EQW>fB1wPVTSJUxao4KOVFb-wruyFP)uGqZXlS_GJTY%$2 zqzf{ka<-9r{IAD~0xgmh23$5|&4iN`CN$Bcrr8s%dN}eml4mso+c;G(lJD!Dx%s1= z&$o1iR&!rtgVE$Ws=Llk_r%UOmKeC%kT*qE_EkbGQ?g!4rgr z&3i%L^2GaTv6%<$LMZJfl~CrRQQ2!K7IKS)>pj^kKe_$J+v>)69i`Uk**F5jhp0H_e3cRV?4Q5pXr@^roA<%>PX( zF2VJ);yNne(~ptqN2lmkE5_xUbKUH)dq2-ekWxlX$#pnl%tnC9s+lZM^+KoDz>Oa! z-aFt$zVvuWqxa~w7%7;F&DAyJf37OiH3l6CyE8}>m|MU%D@uVg#-_c{`C8cc z-VoVC>+dosVU%Cj%_7*VmSe8Q4knyx#o`ad|aIl!B=W zlDfC%B^`QcpsUtfr5vSoP@_=!i+#>s-kCqC=nkXdLe=_cL#5?B8~pNKQ@>F$6lP4& zj{r^25TtpQcr8M>&*Hn=Kos+I6~XzrT%yqgu8`{1Yo=1-z|8m3Vd=iQeRxkk3v=zJ zH|Q#Tk+soxS<~+r{0PGvX@axzj!o>gs{KPNH3OnoKb2SdxU996eE4Y$T4HCg1Zv8a z%iHtJaU{T}y5WVcx-#fG`}=7zPlU3HgoK>_r|I&!9u*o? z?sCZH_3xbC=JhJQ$z;PNLRPg!?)6=#k7f1iHjj^Pgt#Z6rUxIPGi|*I<;vFi&seKUyCVq)_YVFkLkhvE<{A^T8IXDF7^tOupuXJ7A&^QASGC^I73GeD?)mNJ{(7$4O3-+n= zw00b%rujKi>-=*W@aS4y{LpHPj-xBu11J}ijN5yxj(!u>^_H;4U2Y9uQ|tSv1ab-_ z)6;$Mp)8&E6VEv>u7Ra$$Jy*;*P^jhuc={W#*tDrXk6Wdf*$v%*XEF{@@;B$_#PJA zKF!2E6B^n!^bfyHhp54lt)As{(P6u~+QK3C(HC(ex2orMC8rcgIQI>m7R+VYoR#k0 z-y`4WaIw6;SWjt6(hJ5D5fp7W^#L^3f@w;xEvck;`f7f>JAK1lx&1W@Gr!vZBc3&J z%9UDq8x_kjdo`vJYo6i2h2m);qE)S*^q4v(CDV;oYahv=nGW58(7`8tNejsP`Dk{M zbdPRe`p&3K=l-~Kdv`kin=~LadZNLH{v&Dx_KXld$zHro<@uGs$LdagM&z}){gb#* z%LJS3BD(cJ((tD_7aBF+gv6UX>Uz^{tTk|`lOXqU?&s|=Ei`#Y-ZHkXys5xBNZ8WL zb}}MJD`|B&OasZzKn)5&PRDB_(6{y7@>yp$$vcWU+&_Bo?1g5!3F4R0rgh1PZIx7DA9qd<>c zlHBex3@v{vT4eR z^ECt9^=AT#sg1}}SWI@qzQ}w(8JHRGv45!VUq=^fiZllgZ2xN12wE=xpBzwb~p~!0{-EamxSRw*Xw_@2& z%1SD`p;v8OA)?!1)Y=MGEWT`>T7Vm8U`SO5sgL#*!S&SDwb4XDiO~R!TY!is#R<@8z6a+Jm$F2|FLV(fU397?WY)4( zzP2a4rylMd--^)IgB6eOIa}SjEbt<>Y6GzxfY-5hXNU2({TZfyH7A54S8A8Y=E2C> zGSTSB<7L4b=Li~FiG8&Bj9WorcnzlEH&>n$r{g$o*QVdp+wf?d#F9{ym8DU*M~;Wo zf2KRx1U{=6YV~Gpuh^OeX`EeQ(VCWSZU>#1hNKMo1;l$k;_tRL>;3qzu%E88kfoc6 z5^brzGk8gi7POZjS-^O)tOhqbkKGQPY;lC@pAm|r{J0ex|8AA?c)IAyZb-$3&MDyt zO#;1L1GT&9zBx@fSe{&W7E4?kVscLGaXy5;8C-wMt_i(JuYI`C+&d9hg&sgVZt1}< z#1+Sx&}2Qz5o36Y75+51%b8-TM#hYXFt=22FSM+y&?yz09;9D6E!DW6LA*E0mm9%z z=5{KOu6=P8{ z-7$Ar~|@X<#oIt!8>ji2?TsFF`=ql9fVkJ*?msxEy$%6|TR-pJuq z9VB*^kB<`6793!)yT3WkA?w02l+|Eog&N8bFZdO`;W*;M`?x*wJ_8%TGzhE{S|h?n zl7&(W!Ww|x4~60fBZLb34nq`1C&U&I7RDz;QWi!f{O5oeA0{qPL=;01LnKQmOE7~! zL+FUu9tIl*BFs<#<^Zk`QzoD-rY*E3$V`xd-lmp8WvxaPe^Ufc=p5 zp#89Uq!6YT1}{ug@W%mE0l}YrRAj-4$@q&Gn)KyUE}|C$kNK#VBeU1^gAMWl@INdl z|I-@chxL>m3RH`BLAM0V9zbhS6r&#~w;!0+obr(g7! zC=>Ua@6Qj{NMq191xlPF1|9)RlsAQR1I`ogmxdtC7lDg{|06i%3jz~`(&r640{&-e z0G|LH)D$9QFQEW2ZaAkM9Df5WOv5h?LoyJE3b9dOnBNwZGt3_;eELWec;ymEWfDC3 zG{6vX56DoqjD3vSl^n|$5(VzlIK^)Y8r1MNf@2&S8d?Gx8hHH_xoKfG=joNJkV9v3 za`ZAuwzT(DW0r-tC;QaL<@w9|%j?^P8C_MzKquYt;;TJK3EFdZHv9GFnXh_x*th&q z+Dw{Q=oJsz2m)evDM2`Iy@8=G38Qu$?x)^H7c@GOaPjBu0m_hJ!! z6AHMkKH=oV^epv+^xRM#aK!|Wj=ps!I(Ty0XnQg*4NCQi`6W z^kYMrpniY8RTxTHhg#iR{Qb@lj%5$0X6ayy?WVb3j}qK8mnXYF(Qc;yZ4RW6Td?Lznv^{4HHb$6K8Db@RI(Mp) zhVi8_F+4Je!GNGSEn+hnkiOamQl=7yl^_qHeq-QnQ#jf2kRc+cB z41E1QUds)Cu`kjL_zXVQ3<-446jNCH);7dkO}>Bc`6fL*ep7nVS+hPqGXpB8Gdg7z zCVfiY?=|afsOWk$99gDKa&1!XR*K6joStohcbB%{!?kVZRKI_0vz!9#AGoLCvRfjc zbq5-Eu-6;~9M2&?pG&;a-c&k%0hyX?k26h6C=Q6g&!bT^U+(k`Pf{dHMVAn1}rXBLoB^3 z*Xr(?5{m8>XF)++RF1PtQ2{?+o1Je3wJAm0p-*EyW}A%@*SlHQUu`d1A@4BrOxXzQ zxtMq6Jqth2MJ<=^D)0qI?I&IV6V>TzzSFgso zI_ZyRWIj!lFsm4nAMcdXalb2XEnRocoHKowjT{?|kq^^YPrdz?w(QZIa@=_(9 zHW~nb=p@E}w?nIiVhf}|7FMfpIQi-*eipI z2m};{^#7w4I=TF>wa`Z$ntjP?6ORd<$ryuJ5T(Ib4@(NeM1rBE;3}|}Au63AGA>LZ zEQ#8ITB8w7P#9gTNt23zAXK>}kUI1U-fZWUXVuMT*GunrmwVIa_vW+xca~@M`D@zd zibViFzmjZPx=P0UQ~@8+JQ*Ab8fIk@R_^f*7-~(vZ7c{2?C&2!VS(bqHP_N}W(vfAy2EL@wSNXr;$X0}G;aZ=0*0`fLubugsI zE+YbUpz>z^41qqx6?BXI`c-FsM0|rko6=hJhCoC6?NWUQzi{JEL4@%WtN1uq1Q-!Z z@+Cr?uFd161lVd!JQ}^NOal5~*uV#4LPYrJ0?cC1yC?zJ*XCh&H`ojRKpgWFX0B~V z7~{G&HS7DPL4v^PJA)hg25(^NUJ*=RgV%iax0(7uM)^MU8{>FnpKN2)`Ri8S*S>Z9 z8=asGKaY(s?HfGYz^#KzuOH&n#bL)^UY?$YJUurcp4@SVyFCm7J*u5w&oC;P=u{4D zsCU&>H8n%`)YFaV# z^7X=~Q%%UP@3?_pW)(SKgwcdS{WG^#5dt}enP;HETZ|sGUG~+-i!b08_Dop!C!fx* zc%A}ic^ANDL450Ul_o&Sj2T>v-E0QB-s}&i6z&BkED*P|&_5AE)CMfaR@^e>k_`2$ z1D=Mq7YaX;c$UP&f;Y!kX(}fa=Tp=zPQ21y0(=C3bbkA#6Ji~#p(LN?XHU;ihbGOXL=-=oeKhH!?b(u6gqD{{LG!!`1&`0GXRDDQ{QnUhR_hlp3n z9uCwW2%_&J=(d^%FpF;{9yJ9P?D|$0fpnh}9Pj%**;ZQ0n9=j(UteZ}6&Dp3pCHpc_wTBR!VT5}_%HayT{9YC$Cu&aKl?ZB zD?-BGqmYR`;-jb!ZxEm6C`FdX2D9h0E+E6%6qS4nL*7HCy^G37B+9qTqAJghY!nz2 zEd(n35uAJ>Nc}xm4gV|p$XYJ5T~qjQnr>TO4+p%w#cg`9r9>U8ryvH!3Mh{Z$ zqGOe$L$h43rYC@6txPq;9{1sHrutyhFo`F|H6~3Ch~W*uOo8M0q{>#F*TzN7V#!A5 zo*-GZo#0R=Ce><&Z>(Orw00CZWvQWqM%|hA{vDP^X(rO+0}%HJ*3k#Ky{3)dA#m@` z$|SnQWeO?6vHs(8IMTF$%hdJ-MdzxdY#+&c-OeqJ*s%aSYi-$AY-gVVc`{erBLz@f4iu*Uxo}je)zzqR9R>&dp7;f1+q;p*d@rsK)M9fxXuzLm6Rrf=xZa zj~5lj!@W*c#r?rAplEQBde@ND3D5n^IEj9T%8O8;&0C77#`b{Gk_fGnb~pG|K&5+I zEi7odxfkVvz%2|9$((WzLC)|AhUFxuWq-(v=h7Xoi9XwhBf2-DGy+vq=jxg>e(aKN zfO10g9ruO~Okitr1nxXsgM3!0*Yq>`%}R&Ns5W9&0w8Z0#E@-_wusZmsi?AuB?K2fJAZEx_ntaje zqt#9#I_L&l*Ynp%I_3_ZxKYJ!*wFEP_vz zxRNJ+xMqw?p>PESc%G^e2ZFtgz#mm=1mcAz8M|;Xh)oEt94VV{F}09%uN1Oe38g6e zHXPr$E2PRZs|@EcdNH5pYmOE@lv{sN74yd<>1(o<4tHBM5Ea>iz# zcOvS4Kp}rQF)HaO{_@75B^b^-|3VCF1vOQ%D;J$P*MPU=kbbc5TMq`6a~uU$TBpRM zcK7xK{G=8bwC&6}y=QK1-6AyU?Jn%@lpo_W>&|snkoz+)2^7!?OB=UwBoc6Ip7nLU zIwa^JmtuMtq9=ZTz2It0)gajyz-!c7KWjGC>Bn#r zec%v1c|)J-cyC4bjC~~7I|dz-qO=F#DN_()2~%M3mrR(DHW zD*@5yrHBIhWp45@?P&r~UdKGWGo6@xD?!rOo&O)Ky~C0)3=mj5wr$(CZQHi(nQv^{ zwr$(CZJYV;;#MlFR8o0}Ui7Kcr*|MP>jI$^yfCtISW|4}>bQ4?%Yu)oiy8K2(#Yl! zvq8Qv?>KlZv7;9-qdjjGi;(Y%Yp$$~$bMOFC|F*cx_j|M5}u%N-e->oSuB(BUZwp3 zP?z@N7nTq_mSj=m)tnYok7th!$4`FCk+OAVp<+nd*&HD(933LyZ^LP7CQoW6Ny~vU z;6;?Rj=AA&zFTIuNU`TVsZ}JbK!5iIW3H~EGFh1;LqjU#o6+g8c!B47h7F>aKCHGC z0+%B$VwOn{`f6AD#^<<-2yWx6PuuSqaxWgQwOE!WsC(GvRUKJ)SQQFA$gCRbCI%VK zFsv$}d2@0#-1#J7T835(mtfuc!3KLX9>g~n#_mL=Zg324~|6E9d;XnFPfy`YfHvZo5PX3jXBBm%$Sf8;L$gzChz*r9$ zwY~Uig2bpeB2r(asR}be z!MkQu#eW~SEfy|;KQ&=RPUXQbY;X2!PucCNEV(D0eGo5hwpgAu`w+kfwE`PSy>lh{ zeCrYI^qIhy6r8NlGh-Z$m)LA1vCC)yOWSJeYa}APhk^6Epw5&V2_bnHHC=x~>E3GU zs)@mDL=APMfIZE;~SDB8L4l`h{d1S(P|ZRP9X}4QeRA(#tky7O&fq%?gg)JqcFm$sl&u0E@rg1%keuM(^%i)Rob-9X zm1;=4Y{R1v`-47)kF_Q?+g@diG;-h9mYvL+?|czuaTn*QY!LZsaTDqImZIVD&?RlL zt9@|T+6;}Kk%uWV&?n_0!rWBqWSV|3syw^XZEp z)CJ*>psV`?=z1vMn;p1>j2fc@J`8sxAYy9NA2=0sq*VG}|=IKJGcn|WB*mMT=xtX&yqt7KSie7Irw6_6YE$zw5 z7&BulA2S{|jQK+{>|3J+i@&P3ZnmHMl7rI=u}{&-=}OP*f+@j`xp6YnWKy%!bI8KF zmU-XonZUVcyzZ1qWi|nOn%6Nb+gYuW zyqC`C9ILSMF4Nx5XXB=pU=BK8?ZfcW>3AkKLLAtZFLLK{`lnR@Cw4+_eZR!AB3~J3l7*4{9Q9UfoN+D^4|HG+CM$EjXE>7 zJ%Va;a&>oqcCwzWwdwuuscHGyiU4;OLv8!)O0?wR;JP_WGG5eO!&ugio&mR8b z$ISlT*ZT|hJhIZ>(D3Ho*cA+NP;bY^IS8`8ioF+{d-o6Urvm>$_FS9UI>@uIQ75;@ zx3}?jSLasW-U6Fn-`V=0AITHr?cV0?{?qsqL;T2pD&n2DIS1OQnQu94AK0va&qCJ$!aJmQDg2D}Z4VI#JI^hI24ZFPwIYd(Hj2abh(dvtmmY;z6X>fHX+()8xs zc6Dxh5usfei!*wW|hVZ4hh-XiSKp^WQx?iP+WpydQ)ZBG(n|3-vO!Ora!&7&;!z zXC7U6hJgT~3VLSlG20x67NPK&U`;-1q>vB9y~RyxSOlUjB(e-zIM?9G5jD?{mkYSl z7EJtue8{%|3WO81O(IXHFa+@jbo@400EGQNOu{&B5rzV7fYKl~q<`=41{g=OLKxH* zab!r*#4N(dXkZY5Tvk9v0T7A=Mlyxm!w4`4h$6}G2w;E$ErhH9Mot2-!HB#9pbNlx zJYE483BV{X3@<1rFUP2$C!na9pqNzw!xRV+WL%&SHSvWR?N@|0os!P*H$Ig1|!t!|mT-OJPLI$_7KMoDY|P?&h~|VNn8r6pw`P1m6k#~ zOci(_Kzbq=Ofk5)ckuZ>x2`RKAU2bJ{UPgr3ENqj^8wC1zyQF)5ye23gM$qN(tnMG zl$rK&ftpb>qXCTs>1j8ogY8mK;Q>VJTChNNKoEWyETuy{HT-WtEILXE1)(qcTrT}b7Jp-UR_^|!&A<7%N{#ySu!4UdnZHLe z{w@Rh{PO)r41ZS`Q+yErmi@Jfm-M4w>~R3ddHvyd6MkOEKe+#l$^X3NU)gW*xkvQ5 zNA`Aq8RU^ACRY6XV`T(%Xv<4o3*k~nC;Pn~pZBM(?B{WJm$?SeDzg|!1HU4s&hW}~q z42&tjct;4sw#U8P@QHNps@;TJf(P^?w$a;gQ+v0!#MPm1Z*5@0R&;Ise!+Zu!(RNo zLjP<%)#RN1RS)&8UG80d+_94XRv{OF{?X6zpIyv-B)&X6bo$QywW;Oju^-OC0s$Gc zP3n?7PyP`k9RT$ErLkRH)6@ml_wm~Qy^M+a4|eBlBhmrbFyaB?`I!d<#sndMV)Vo< zg8@*{0zER(xcx}-na1-EB|=}JRkZmU4YA*k%ZZ!_05$D$M3YoX&pNEFm23Lf(U`*n zI-54@-aI99BVg+a2da!JSZ2voS6g}tmd7M?7xXihP8HXX7GXv;k@u?g?y**K&u;nq z`My0ld#+|{$52=4*$oxmY7ighA3R+L#oG}jU&4I&xQ=F23Fn(T>h2htd#(m*|2i)< zk-l)jWq62YC6}NT*fzpuOrgvp%=5j-mr<&7wuN-%^cTb%4?>=E`2RtB`!JpdyHj6H~Va=3Lh2- zCB_YQPOD?x8+tz1R-~8B8VKW}^3L%<-?$q-f{6yI5r)F>RyvKA7JC&FHqhZUhyc!{ zfT7-+B~#*)r05}! z1mhRqzjfO)AHFfi`{vl-lH8{Daa8bwq@GA(_^@v@5fy~4i_y)5%6zQtUpoYCqWafJ zKUY>0c`07K}&=yHQjLWFMazkdy?bg zWE+1YRvDnJ(OIYuE_`J=aAoM5^>Hv(i0LW2r}z{boaPdKbX}zcvp%?aQj}ChO*?_L1Qs zK=jU*2ubI!Yt24+2Jj8lIOB^`lmMg~bCnFxwtgMA(8pqPwlI6Wr8?Dw5ou1)aL!-K)k#i42Xx!lxb81C8v7a%bDH2wYMT1>ys`lgo?rDK*5`!)mq6GsMRWJNeT_L=9+VH`Z*RrqtDe(DjP zIQtMQF{SEjK-DtL|1r!;MP=|f(p-6h4f##-u}T4qn%GTf8>BJr&Aox71|L_dL=S#3 zImmO#0+|*8$Wheo7Iu`S1A5g#q0YnD@-aZ`ludL;v!K;Z{uKE<)!fA|&n&h;8|)jM z0ayCQU9XlGdhe= zz1wC}?Q2Fl=(6~fJ>b69%k=;G1PUQoPaYS{hlk(dz5L)vm`c>s4b zbU$#QyFa!b8&f;lHZ<$x=ElHu^2{U@4O7e5*z1`^+ri()oLuCiAjdx%RVULQ%6g*` zJ>0-huFOQRus_`LP%unD5NhOu*E-ur6Xv5UMY2kiZ1T7`FQ$j39?uavyd@xmF*GlI zAj?${+odFFMSpPWEF@G}Y|NA;D0G{+l3)Xu(&19lare#3W(}M~G>t2=r{d?RRcCr; zS*87lpq}lXt}`E=WM}ir*Ch-vs95DkCX;Y7tuCy}(93A-H*K^Oi-l1-UERZF!QqKu z`BorliAJxQp8ldaopw&#r4ej(4K^|X6mp;^vL@uG9q{h-{&EF#72~zD45gX^+MARU zMve{dETh*JK{#TxI%L~Zi&_%yp)IM%t1;8_rZ>z3zpo$LJ!_O7WnbT|!Hf1vrcVz5v*XI|u{?aM^Pud7Q7wG2Z3oqYaaWcV zmu)I7i~g+m+eCK^DU>uCo$J#5Xm!XBQq>jh=w-JFYAcpBMt=oi2jLXFmp=P^=8Sdh zyVWNh3*KLI@KHHoTk&XwVX`zwButY%NmqTWNzU9VLu)X^I#o7RHl$OGsX?Cnpw0?a zukE~#Vc3#h6^%^nNqq82h9{;oc&Mxo=;)Uu3@o+^Kl`dpeJe@~|J9!YCD>e-$25ux zz-Q`pPgSJ_GeNFMAJqrTq$umcYPlNso+Hvwsq>IbQg9Hpbypt0ROX4h>Xr5Kd|lYG z!wLIlhaOlYt|gs%^q8QnT-t3Va}`~_wV{l3Nu`QcWfVAX!3Xs9&oWwoxUj7eG-UVAYp$(C1 zjO6Z)X7Txf?%x&Uxfs=8qQ{@HlXgAoQ3hcBrOY~?bK`%vzjYhiJISv|z)$W8h1>Q6 z$%tzKy;Zfb=im?YAb)~CUI{9QINnj!UimoGkX)li^T z=pesjF@C=c(|AYijXL(OW1)Q#3Z$r+hU-*1{lvSWO7fQYd-qM+X9L#TZ7QOF=y2MP zYnbfKopor<8?T5Cj0z&BuW3@hM`#6V_&g5dUaoR1g&yG17p-$EggwIY0m;kfO|$Im zWX!w>-ZI&NI^l%8+)cDqUO!*NW% znC>sPZ)TB;U|yHGzMyX37jkUd<8yQADA!9D9CvI8bGK#;il?<`3BzQ^-F>_==UwSj z&D8q5L8eG84t*}CH570x+|bJLC37=MP!IOXosQIJII!!e;!kpJ&9fG>vGt{`4U&bi zzO5`}iH}zjAI|kUVh-k-a1LD#svY45_DMAfYqMqg(5(nzJrN&TmbC8P85Ym4aoNp7 z4`re%>^^S}q-z$Tv7Yimc}uxrBwh@B4a+qKz=PpH<&E$7u7}*9)M*uH+(saLznMl3 zR|>d>EGd(uppU>O5dJ8c-Nf*d4m|ATnxuk6FTMMGM7E+JA?l7%$5_;0-BC*KzVf6nh50WM#S{h;^hp~xxTUh?dBgBNTS$DohC-m;Z(V5yPeLg& zOd`Tw&DvY^SV-ehm1u~=_hK5?ySv~ouovrp_T8&pt2-7d3>PszGToP&eMahe9yxpV zDQb#R^j{Cgdz$w@?(rSAKdJ!}l5P6z5hzoKOh2h_+_M;)X5@{A2kR{oEx%Nn411CT z-?#WwjFsFUPVisJv?mFL%bQEVEHHCpvZl$i9ChnS$@i^RJ;YFAHS1 zN0x_b30_f{XY8`Z9(Cs&gn5;bOd~7~U!cyiBdECWdXh{Rc>o`+Dq+6-`-VWw<6`p1 zALs{oOa@Yo!yxG$=<8-UBI#6UHc-tHna_xKtE=z9wytQlyey%iU?Rl4c^BmP?TxT{ za}qZ^q+c(INlE;ozX~Q{pk@|Rt(5#GPv}=0d#x1%+KQpa;xy>Qn$21Sdg|dM(yoi( z*JzvZL)ZMFcO%{toZN4$GE_lcoIi+fPd0+zPSm6KA^9-AltT|fpbe@fxcY5h2_Hb} zb=t6lqLND`(1V^hYl*)oU=v%XV6oj5hP_pQB7)Um7#tXf64jwnjJw{*^SQUiMy>v~NQ? zK8cyRnH}o(4K-;raNK^E&NYRygtTaZPpn(@6#LpbnMHe7cjCOHA;HutuQdlBdNQRw z>zkE?0M)|hngMlTtl^z|X)>tnO2##7AIm>28|l&;T$-{^9NLy56lHWwoDx$1YV~zh zI$KroBoEJF<>|_Y$-BT>$z)mz=atd0hgRwE6>1v_ z-3m5#%&--gOrHULDMoS$Sbf?mHsYB@adcFd*=}^WtV5ah+-<$_Y+OGNJhGE^INwQL zNKyxetrtc+dG92UQ{Kc>g`I!%ctCEy<(sLoItmp6j9wxpprax68{3y-MrnkexI0cY zam5PU`p2?#r`b})R%5T>I4hmbV-vtULnluWyDBsC?Cj~69VUA(w6{OyVmLH~p@tUr z8b0VrZF*^QY-S7jGD`|rQsL=}7qwO{OExTAQs$4VM;eu}Jdh0P(VvWOJ(b0J-HWu% zxH<9BZtENXmQ9bEHwuq^Yz*>g5v#o_zH}3m_O0MwGkEihQ=EZk2+Wp^WaXJ zxeX_9Fb4fW5P>AE$SLmt*3SdwB!SHIH^-A>kv0=#xs8!M|C7{&MvgKsbbNAp4LP9{ zV}*N?LoNRf2n{3o?VqK0JApJ%j7CkL-Yoo?F5S<^F?v zEN`+d!((lv?_=sNDtzJbzwGX`H)=0&LaLV8Na>Pq`l;j+Suzl_y#^+c^i*f-0VpF$ zDK0w2ZpokBUwWM@^K5&9?!N+1H)tzPJGT9Ap(|yUixbsB!|pyYWpWqs zL#>1ytzY-lS{AAhss8D>S%7Ara!k{4T1vA^70UJWLtTEv&x6I|zvAu{hvy&A%!^RN zr#;OW)+zpP!0-F=usuyR^__D+Ss6Na#hTF$$`0vQv?mt3%;ovKL*Hb)sNnI@(I0ep zVrEfbF`O0Bt}<2bly{pZKsANaW&?xO#aGd9;FkWYHEtSh=Gb78c3+)pjxf2mj&|pO zGaZ<0#ahLgc#4}!-*N43cPwx?P zIF@k#yBE9yk*_E79WYU56>ciu%=34C^}g&CnseS6f7d2 ztOE-<V(5# z^jlKQ60MqH%wUH&OJtz82XM>eV{|0-NyW-;ZZohP=?afeuO20mUG!+9{fR0)Rje{Q$!55KfD`EC6CIlL1l7Mr`kDRpu_b{~dkWp;0DCga!NO@fZm z%tSdoAJ@VYOBxfw_e!ZwsOrA(z+--0?Ph$)_iE$$K@etGPa zgwU@0_p$bRIZdv7QH2AMr>1JP2`=pkmd%V)b4!hrh5DMQ_LHv)CpvEt4@&tF)?|KG zw&ZoiLahH$f)G0oDSwBgn7ONVwa`iv{ArkB+p5xRuK!x^4+o2!^WB-h2yk_}%j`$; z0FXttCrHNn(8?7BtxnoT6r~kjluSt4kV2$7cE2v@W=#g2Q$wReWfUzmuACG1qdGq4 zSZTv8ocj;N+<2C01+d|AMGg7;4QtjYV%8{g*7*_$cq|{Av5hhXrTC1fBKow8_4~i( z0Zxp{?-%$?gv=vhEE*8-_dgXeWUo#j;Ol=tR=`FahgaAuqrxc1MPqrh$RZd`H?X2| zdnp)dmIip-e7c`R$aF?qM^xiTraxZ@)_{=zQBd3$3|icr0&6n<*%mc0uK{SlpvdN7 zYd~lT*y=ofOm~o=(ht)q%u9u{cxp;sJigEFrE_iq=w1xHL)Jb4UaoUWyBpyejkm=$ z@?@ui*LYorpZAwXYC{V@A^s@2UvTa%q3VlohYIm%uo?I2?)7=iOG0f4gi_9PI zAo$s-j|9g9a$CTAKC9<^T)KT5m40JM%|AhjxJJD5e5^wZ9lH6;NUG4PjWlQm%G9w; zS0)0Vgk3pPW}T)A*iWlj+Jii^l}}%6*p|bydsb1Vqa7HIYwFPPa{3%Bo*?kaIR#&# zC+$cqGqojItU0H^US3aEI@MTqp;4|2W}bMg+5w!74S~JyB&KAg5#KDS*-1BN0!7p? z?}pTF2a)ovq`H;38q@(IH`P=9?m`3F2}&}##5D86gOSCa&q8U~V$M-gGP{F=l4V=l zY{EU>?nI^ya?K9j=4zf(%O*M+Xj{kKnHKmbzAe*&ly~}X&3MN^nK}_f(dD3d=<~lkHzR0pO2lko(gJQF`FYg5`2%P zX$#^TXHR#*s<6bWSV(#GaGAMq86R+vMe-l0x1X%bPi`3?@!E9!k7(%}R8XbPH76NT z43&e~=w#rC{Yz%zixpIt&+ZP|49r|QG76^N>v3KVaUiSO!N`}Sf55Nd>8KqbY&9aD z;Eg6yo+jjJUBPKt2}!z}Di!sZ8;tZ0FKB*R>-ugd5}4ZoH1Zm^2tiHNtd_4gT=R93 zNS<`w<@5>Ivb3vDPNFT-DeLYg)}3!t+R@Pz(Zj@GtF{Uz z&7VH-R9QTWdvMO>WfS(81ol=Eevwieny>O!CgqWboGM=^YNL=)^pM?k%k`>bXRmVm zk7RfnIQo;DRksAup>g}tpPv)-wm5X>nHfReG+RCYB;}?=TRF3PzGSxhIP}g|%4Njh&R+%vs+ca-VOpxhZe?Sj0SUsUR9BSWuVg;Am>2Ycl90jWQ2^|^s zMp-0F>kWvhp2!f|NK^w!h`(}Q(hh~;zINQBry{w3JMi|9wG;BIbvkon$J_K6`KG~q z_pqvg(cjy46_fobkCM1VZZ($25SM+mqqJy%wf&=Y>4w~7x$WIC^3qqadDql_)l7Aw z54EmY^=Lh^Qpta57%swKO7<=e%d)C1T}=~z#)9cK^=TbBsU&br6Q4ylW&xrH1k2#%L{d1uLqSSF2pZbf2M20lz56SX&RYR6oOdpL#h zhgnUC;lDGD$!U{w$le&S#5g=oY>3iUsO^$JINXz5}- zL8Xovdpw9;21}J0gffkW`z+-tY3O!d)ft8XB-{OcRO*p)Rdoa-SZE=we!; z(xvDEnPYI-hyokq?q8zPqiPL5`o-mwRxb|!o$QaH)F#hqpv>Q|VQaU6ahstzVg=YBipLYbm`(YSbWUxXIkr=S1QGydfu7-DOtv8}1Zrug|wR zV@mOL!vIyf^=;>S(ju>HiUrJiFL{pEKGv(aoZ~gQL5fSCCDdGT7)w);@~D<$1;sVSw=#vpGL5w^A$wLQuFP*qjsM+v-gR?46@P^vWV=LJHf^AXw~EZv4(kDZo01=GG3C$hAgL=Ny)Gr9A9_orAx9 zd;{$K`uzsQLGXxyBhXMYz+dGL!`kWA4 zlYctiv-9)hNCo*Qxi0z`)FC-!*K?7hl+;Hahx`>}*Fss4t7Z!GCDxPro5ozB zr8A@#cRSvS!F_KjzpQXv+kt4z2``kX<&zPK7gZdTCTGHiQIxHff;^$jGl;flK@&xq z^uqMR*b{FE19iq#CjvTmNrmJ{n=*bMA64#1%utO!ZBWN7>Z7ngVHg;}M?l83LW$?vQ>-Jh`j0twW)_Ec)vBxiFHDBDx{b6o&>0M8U-S!f)B()55YMxzJ?qPbDxS#Ngw_L1g%52X4k631y zIFlwmmkSi;%PM;$*_>_+J>QqZCX%kCMl2apM058y4#nHpPiDF7UXP1hqI;jFyK#R% zKnw36peA4!f(Xh6A;ngp*v;3nF=$g(x)Og{gtE??uf+nr8>)=k2XPaJv(igHOoVtl zF4UEsxu~6-x!k#hg0-m2`Ts`|2+<)N`k@BM_=qOF@k4ih;CQ_8(#soA)#P~IZJcrJ zd4RPx+)oxKL;XVKLm$Sd^pW&{cqcJynId@ji9IA^)s?m>=`{UE_L?;yanxUEF& zmXNdUG*ho1prW8kP1*gOGD#2pQrz&)U5zv!yw=2HpD7sXK4hj%{YOh?llD$h096 z*v*Y};6QFan=I(TJvcXdjbeUUJJ4Mx)d_*tLmTrA7TsRj_({*p94!&dTx1;66gCgrIF!h_*4~Zj4nW|k+ywTheXV-b zoSJNG$mb!Yn7-3WSkgzaez?NQ5vfcE72fud_rAcyn1X$3y=C?bsEX$++D;}ic7_M) z3G2~};FPvJ0Y!`2OLHk2?$%Ej$w_%r>e(ZSFO3&{6a|DvwsqQv4peRc4b0)C!QTsS zD{XRF^ljZ90!Eh^NHQWqY8dq65fs+NCq6hD?w_q zZ+tx}>(UIpESa^s7q3|KmKefMrU*+3nf5TFvT z;FA7|uE>lSgD0@$SQ=uO&rN-4W{+;DCL(PMRB3isZC#r#WfM=ef-#TbDD3a=wktmB zd>HX2t!!wSz(qEr{#*4@i@I<36JQyRo8%H zO5t0fUMRek3hBzi_7tnTC4)#~EaT(KVq(P@#I8Pl>OjByS`EmoC>=qZXB-d3;BuQW zfdd&tPS;LBn z6}j6+?P0lx_Yz`e&Q=y?&71DaDQ%tZC8!~`U}_9+zymnfx^}954{&L&$a@h*08>Mj8ntm8t+e~)b z`}>Go@}2yH8Wiu}J$sX)P)=N;#)pe@BpXLgq6T>0XVURj{dYcO_E*dc2YoeD77FMW ziJFbw)=UNCb7hdzgGsyP7Q{4pzjNS}@NRs~Z-Qa)egC*1KrNZ(J3~&@KdPJ>eHcBP zmFx6dAbN8Lm8ogVUZ{z_bT;crTd9V^I_aS*X=Mk?oPAXPXvu1!JF-$efTF=&A0`%& z?lNgqr@$0P!R%45<o{Jw)(8Zf9O;5J8!W}mzjqedq5b;k|#~s`q z-JzgXJ6EV|->mr`^f^^9a%YtmuN*9tX#Ne{+jj_?lhiH8cB#d+r(*@|h{(b* z8bw8sZ)|zwyO*=%Tw#e?_@dB`^qFikWe@k)0o9KRNV@;@5BmW|~(ng>TaRi|C8i(5!11K|<1Hq6SD=bAc5 zwLR}^))V8^x%_hKgVR&$`Dwin(qBZw(;D}IX-l46i?IwxRmSb-N?!MtK%dMM^*awO zVF7uMm!>D~n~WoK2-KC)?eD31uP6`GfKFr}H~;__z`q{5KVk{@s(^_B6ay{>lxYAn zFz5i#fkFdE1{fOXRN$!~)B!02(|RZk*lM^{z^kB`{!#D{4-n)iqZ4YJ{{M&0vptq->&EUtHLcBLTxY~(vwsjhK zcDfNAvilx5D9VH2x0mwzu})76p4fr6R&L_pGyisag2=bQ1Rc2F#tO(ApkWN3&gc3E zR(2O5Z{v=Do!#m~2=_hO1kl0hv3)?MH!5gT1{vJ&}IIf!f*P_Sxk87Xf~5_7*n*Pli^0V`Ip1Si6HJabg3IJG(f! z*Wa~&N18^0Ha34@Ngji{+CRRBo(cmzJqLIZQyJvF>80Dl+JGmSw^zX6gfwv#1iSZB z9q`+!2cf6nPOm}7TpU`Qfp^peT1MQ1$}@3I2ui2oOpVoKyc1m`VUh;LP_e zkoXtD$8!u*a8TlN!JY51ehLHr6&Ns00D#z2{gbAOqY44XhmwvSCKOy4CphQW|C53U zd){Ll|Bd}895_rUTri<1viOG>{LLR2|A!n56CwPsP%&TS@lUYm8y;(fp9P8p4mDCN z0!rqiZoB+12m6G-Ly^$YLcxNH#RbO-Q^!gL`(U2H(L#|73xNx+Cb|)A0v7_1mkSOT zDeQJM@^Jnrh@hM{ZJBdR(qlVf@VGb-3BPNkM~b9 z`~(mHHw-mmDALF&Kiy^nq?-LF8tk(y!*SiNGa~?(;2+XBzl>by#8Wg<4+&^RAPo;-H&#-@=A{hr2z$@cdtiF~#T4M9l#k z+gjgghaJB+hoX@4RnV`Rc;8IWC7N&y(xmqEOvWJ_`lEKof~2C5TNL14?4KU>L`}}U z+wGm#{?6!N$H?wFqx~E^cxWf^A=w`wip&H(F@+v*TNHe8GP)eGbY#cP<zD%kT3mKbLbg>==b_v4lZg6233&QrxDVm+NWmxBim-k=~4z8l| zr5&4B+n2X8cI=kc&Nc5;&8*KBI{S-JvM2$M;^}`qOh9%&Kt8=%|G@UZ`AYYyVoZsA z?TV=J<~!P(r&xcWz1_R&oo$3vJ&5)%e#)Obyg<8o4fb)OHq8m6Icc=rXeeo}{JBZdO`ps;tT9*^$Pq7`R!{QcvI$mmykJ-gA3m3x=(+UNDNZ&KGSO zLzg;07m-UxS-pDWE6xAh@K^5mpXFK5ZkgmUqg*i3Wp0S}EYaN^p?rb<0MkIUt(*Pq zI=-)~+aPe!vo}9W`J`UA83$+PO3;qc0P<*-=VWvem+_x)8>J}~E5c;L^RA57MZHqA zOSuIua&kF8ExN}bDz^W*zonXZJNWx-=Cl4+a2H-0Xn{0KA4sLEo|TdhJgy{c=E>Yk z#B)arA&jF}eOYrQ^XAQ)%Ify14^qZq86p+)b^w2JmV>}^;ivWL(#m(B815eLYF!CN zz|%VxFI*}lx=*CX5rOD}r!|G5Q{vAX25FsHv1Xaj{^ zYC1)Y*ynrJY?E(vP=msTY}&lR9qgzc<>00oSLpB%@3|^g%l$D%xD`;{T$;5+G##gg zr5Dg*zO^_9@8M#z^ML&2pKbeW)t)j$|v;1em&BX8BFj zCdBB}$G@$tXC-Dy?jy!g&XuzI9e7q9vY9(H+~L9|Qu6!-{VGH)8Lb?#@i3^F)%7PS zac^+!(H;2r=Cvm$4?}H<9S85?@a!5q?w0S9tGzgMoy@g>H!*~MB?{X67F&$MHy1i` zWAEDFc~-sd$iydauhOYT5rY&WHMIm&*e33J9R$BP&BJl<=qECq=W30fvaf?-M?-xQ zp|g%mUkUMk71>uoX=dyjd!sS^bt*(DX`}o+;~KC=fMkhF-A9T~Rw97{IAS;8_X+GW zkCJhTUD8*xEg@<_-}AY{a6aUDDC#(MRp%2N?Hj$+-XAcV;OQ<@s;IH%MtXKEolPyK z*|zZF{h~!h;JX}fKiDnvPo5K1KpnfFMCyo&%#NAn0Z~<^};=N z24%2tkV-V3xFR3+9{bFK7B?(1mi@GmRVRUUdg~^EBUB(gb27P%=xAx|o3aA)+DeqK zs8t#yPfJ?XRxOQW(%P^&Lm#Cx_<;3hmh_RMZKii_;DpmKF1cHJd>K@GCSi3=eQ)16gwOee#q_$jiZ|a6hYCSz z0)@?Px?>}do)Q7Sb(q)2#@grTTy&7tIuvTiV?76z9 z6ifUwGs3s0*4V$=z`C?O$=Z{Tf7*lekE4m@dKbgHkd$$})+;=3=3Zn?-7_Ptm7P_@ zek9_Ot4{<6WmOZ-@;@XaM6PQKryEVB>$K}KJ-tkz)9(HKgMtx4f^_*AA<J3%bMhC({|LMb^YehfI?^I@ghzCD+(ePjAnp~rhnLIUs60jEEREPSyr4x~ zXpt&I?I8DFi|Ck{8SP9;mZqf@rKK{Gr-{b!Vq3(EWx{+hUuVZRpN-H!Vk|cdeL`N3#XU#jKL4Q+w&v^}S)%hQ-on=s6VV1QaL4yZ^TX1)G z2=2ilxVyW%yUWGh-7N%ncX#LFdV!DbnfbbVW~xVO>PWr!*LrHzIlJoAefM6C&bYGD z?{X~Os~n%#2_VyLYsA5?(o7upPlL;_qhO9h-HpxAomNQV#iLD2iS8Y8>*R*@RqpuI z)pe%0E0tuQY8D${&-bjznk(66|+!oZ@A2&mm>*FPa7MP*o3 z{xyx+RpLP3U1fPXt6^2-{af)l>CdyRRZTH?soRCifZs!tkfX^? z(N%qnC$Q7gbUlDpKdv$URO(iBR!y0iB4tTdYUXJXB1j?N=JWIzL)g9Vh`Hs9$Rs+~4JfXrdO3}$cIJAlcQUxWU=|o)k|ZaWX5O;PH}~DUbpqfWe|kDTakxSIa~>c5 zw`Aso^2SP%f7`rRQaY;AEPx5cn%&kffF!gi@ z73#nq{mL;hf@#%#=o6`rIWIStq9of8#}MSr`pe2qa*$&sb~2<+ z0@)Tkw?$98Y!n|1dzbzodZN3EkxR1hYag*1wnXa61$?|)A& zx=R<+x%LrfVN;M{IdpWF_)OC-0(4jq=ZJw(7 zBUWbu|E=3D`|aQsW3yziRpZQzv)+)ahJ=#>+yWmMFyYAxzp1nNy-Q`L=_4UWRnyGH z?B-<~$NPwzsohy-(#Oi~i=C^(Fxc}SUH!SIQ6_{+C)~tcJ_QZXGdu5S3<1i*q8&$y zT`H{&A362?Yn7aQ9Hf|%T*fj;#TgO9D2cAOWucFAvCol&+cmG@I&i*>bC3NwW0Q~< zqilk_j#>`0a^{D(gOiqnQ=RC2GqZCe>L_OnO{LCT7WzvjI;R(|#M)(M$b=&M;y=O&{&&$Vs&^xBZIeNvmf6{;AZtX{g{sySf$fmM^2G}Sc2gd zJ2}b*fM1lC&_i??jw-eNomColDC_f`<}9t4b=;=R$EaZQ%}cZz#JWA^b}?P+a!+Cf zq2((gVUn0N2!Fr_S`KonGd?z!RC~g>U)TOcu8poJQAP881MKDZM#SK~AEC5tArMT_#{eJ*6cL*`iUO^%@45 z%88qkQl(Ya=8rhCFPzjI83oX9Z}Ump5Am%gET?rEk^44@bF43Y4UDBDa$N#*K_Z!;IaU(vzpyH&bd$f z`*4w5?STnSO``QSXj|vSC%C}%$+cQ}M9zr%>Ii_=%Ec>0DZ_@`pfO;l<9zUTS` zHt|b%JDfS3?VaudEJ09^JvfSbmPq?`y>jls+#fd}Tl$*l9}9l`=tQm`%!QTTqgq9K zO~n^K3ubpVmLuWbEfh=hZ!oY7Xz@K(G)i5U?vH=`)A|-cnQ+7l(27?xP;>vT_v z4gjFCIy~g1FrP)J0~i3ZeLp_!(O_-l3whU=m!sm^z--HVNKW5eF@{>g{Q*}Hp~p($ z6|R>5gP)RT&{d+hzs~pbTd^yVwMgxE9cV|THU~Z37U*yu_Z!J=jFUc?jZ|5c+bh@Q z$reH;GrJ`HAdE)$%U!}ivgowZ6^}yNQLw?aK$1%rD}1oif_X`S(~-Cq5u@^ zm$-&vHu0X;rnhiAA1^lBie~kvhGv{fFtp)|2J~hP%mmfl-vxQd%{c*uW7WW{iU4J7 zS)Jd1HffH2XvhhU)^Ut=zbDs;Iwk5@VV+zs@OjYR%OD2ijF%&MHIPKm5oJ_2qnV{% zN_$Ef)PBvP2o&*<0&4>yg@!W2cg<=%m)cyg*1#MW)bOgF5b)=kWM~>L!%K~?YSku~ zaJyB%G4i?0wJRNSoE1Au(Mm*JSHST7is>iPftonDBbNNNoN4V6sRN$NpAkx0Igw6_ zaJamizLACk{h$yqX*tGc;SO@Q>oPZIl*TR)v;{}PqVbbJGM7X$Jt4)?;FPQxgPnx* zh~<_zn`^mbk-{>W?77tLoPxkhpb=;+jN__hOg_Juu&D2t*YBBEaWb-i!)ZhN=CnCI zkw-!RKP9pUV|qp69?5(5*-+Ggd}ncx{ppbMzSpqs1x1--!wy1t>Ef1!^KE)(fR|hl z8SuGC4~)|59rON11qwL>?xeEhM2xkMfMw8tj9N9iRI{K+k!4aZs7a5YPMbuW5*`-d zdSvj2%@W@t*vGl)O(Xn-zuGUE$y~6poe+nVZ1?ve$B_zhh&E{z2nl3o#Dv zr&0(HGsZM#OSvdZ1zc_mF@p%6-}=}R-+Q)soS~5NY4}DB?~{fxIL~~KQnJ(f=s3dp zD_=3WPjrV$+Xc{lVHg;Ro%Hsx*H_ew58|nPx=HRM5xj1#zNW`mh<>vdyZrg2=4p_5 zsNpy;+TMu8ONOMrzLiQHc+8qM}m0(SOb?i;}T{(R;;5o#~JY@Oht;7W9Xk zw4UB-KLtrU+Z5%#{tlh`ffq;1SWJXdNt7`%dt_~JcNh67C`2Tf6Jl7jC&8n;ZhmLJ zgjgHq7p-{r?X3?-%{FMDn(q+O@ir~R$sGQEaq5M+Pq?WLs4J$DOEUb~@W=iJDa#kj zm>p(&wKxK!E7A+;UrQ0O;x(vrP~r-2@KeqKqZfvi2l6Anzyc$+sW=5>pJ5p(rzI1_ zvADQ}j!@kBuitILLMS*wi=)~4PeN|?e8;{=hCQQXCH1YH{!;VffKksY=5er(EeN34 z@=&9tm}Xt{+z~UIY%xDujiDM@#funhDD?#TD{sxZdSnaOjoi>L)0gB!a?labNUiSo z&qg7DI~nStEDvi%O(QLCZ}7BJftiDAB1h$%vsjGZ6is=J3AR$ z-ZIN^&B$wLrpP~a*Fs`ZYxMp1-BNJT=J!Uk=i7J%Q2J)R$OAZ2??BapgrQ=DaFDIs zlC#nhR})*%RaewUE89DJS=Eu;m5h!x3#T3x-gL6$daWxXb^3#iiV92MS($@u#JQd0 z_VqBp91EPDAz6K&V!r;{Sn)G6^j(wo$w%}HXMY8Wi?V?t;l1pyxq^%J}lNPqz;o) zc!P^~l*Au`*OYYU;_~Gu06Wen^`?9h<|k&J!BKf24)5XHMFLa=?i=4bRuP>*PR7wz zWu8VUFVlE42Maaw`m~I|wIgxHJmF7gkynMX+sbTlU^L_(EVVVu|Y@QVdtXL?6|SSqWm)3x8soVUByxRFtF-oPSi#wwk>y@atqA0e7Z|-e;2zzH>2SgXiNU^&bfHSSRA2ai@j@bSFe6qFTKG;7+eNHFJ zbEhd_9~`ANt>}IoM9$*1`MLR}h=xRaY}Jxd4CCb+0uS#FMtohZ7d#y>Cov^hL`NU7 zmF?OCeo6Tui0*hvC7&;86-tM_?@QV;^-BpouXJR&e@Af&?T##}ffoXkxzAfDgpO~7 z{x?JC>?LiIA8}9LpTI~(%A|<+ zUmsS~jNPuW@yi-3Ul=uCJ`Ydhi>Ys-J{b^1hnpRxtcdPfDR|Q}iIWdt?B#J5&*rA- zd#%WCHa2I|CQKle7ZV+qy`2Ya&x9 zxszekI)NX=FTxS2s2{yd3lm7T{8kBUvSfs@1TtZtHyXG5@1z1T_4wV%2oj%hy#e3W zIGbbh(YJS@(F^BNTY3#^hgMeC6pCxzN0I|u!d-Xc9aZFWzV77+X(vF6QSr=yVfZnZ zE15SX_!0@CRwY<&94mC^5DDt0+<-QOWET1pGB^VqOa0u0P1;_$uhg5t6vLzE;!pPo z4Z>2F*Zt3nUeIY#%SULvAy?$caS`fWX>bS5&B84X^i$O?r7^>UT&3ag4p2+OgOt9C zE*Mi0VNfoPIpPLovmWf)kQB96xMm-@c~WJ1L*2P0-&QPXv=Z9mA1tG~4wG*XHr`+H zr0ejqiF?=sgi@^;Q=Fav(@#`c?W6T38Jx2TNL_iXrr#8wb=g}cuSI5L4aNrDcSU2? z4Apt2ft|0Ps4k2{?Hwks9d#`)=^dPFA<`MwLiW@wWznz-u`dvSh?m^Z=rv0l+q{5n zv|b<8aXMr8iKAuXa7m2nDnVyBh~@hespv@}dunqx=O~~$33^3sqBmBAFfP5mhUfq+ zXC_8}37*Y!(eHD}mK3^^)u|A8O}>YSCWm80pH7`4S5T|^PF3kc@0sKQ)aZ5;?d#C^ zoM4LS^201zXib2sdeR;9my07Ka5*Vb%Mx4V^g-DS3}H)jU{jqY?4^XI(UVzYa2R%x zrB(7R&&xnvC_cFNRTQKPS6B(j{Ia8XV{VfYc#OT;J(}j!A;_zasDEw>-$HyWUg&Ft zY+Vg4t&M`4sYM)jOiD&r#E%hS%67K4^Mv1SsGzH zYE2}p8}q$^Q!?$s0wJ7H&sbsl&PzM`KMB7WBTaf%dh8`CJtNd)S5`;^!Mmz|I=Kv9 z;7?K~P1Y$@PV!tOMzl({2liRf8m6r7uVhJ&*u-@b| z%uc7vK}h%oXCn3s6&*|rZqYG`Y4*J z&Mps)#LXi8z5QUuMwlhJLnfeuE~h;lw}dv=!#ax5BGDqsi$$(X9Qh~Hj>Q?IPv4wc!Q!@Uo8^rB>VUJ!M-^K$vFilM*=D}Du|*r z#SJuU`SD!w0Y5E}gPBb(3{$7<3~mM1pN*E!Jx*D;T&STW$M#Ys0th5Wjki6wVU4yH zc;M_C<}Gk!BBp4{baqTCSZo;hMq65yKv?ey??{BqNP7guO|wZL4#f&u<|Oy>S$7!7 zlDR3&)0W@UTt>W&(YJ(^(U#XJ8o3G3JbGNeK1N2`{4M3c8!(UTN1Q$}HGf|>%#7HR zw_g9g!K&-;bE98wnWEEBfbbWKS?7tqab9X= z%cfFKYB%z5m!xa!Dw+KNHAn1U)}$HJ>SVFQnyOj){>D$MQ!$1_9HIrno4Ncr^*MF0 zWn#HAuSVd@2~d?4_}_aD_ztL#^^kVfM(MtS%wXD?lXS{osNrL~Vk0w{lnFOIKt)8D z`N~F*7{R%YFZuMfpH#Qj$q1$oV90f?wgjHYS>v?eTD9Qtv`!kcJ8rBO$WSRt-_~Y7 zI}bvKFT3d!-BU<@Ww3hY!y|`5fBJe<@fyzh zI3o&Fe&?k2XUW@iK=ZzJJ-k;BXndMRxS^r5`y?mNNhU@gxc*0ie zF6HDVJbYEVuFANiX00jJ%5$7eRObctth0C^aJ|2QIzbL2>KvvpVccZK)$8|8Yq<)J zj^W|sn~?iK0tAkwZc^oOR-Z3w>weRh&&fvZmZnzeq?S9>DJQKC*ZDi1l}UNN?aVRS zb!QyQy8B6e&$oRWl@+kzaoy--$i)dH2e!U}<1x`9y_8Y(BaP8g2wK&b`yO?Ty@x|N zkS1b@MSX0tk8JRE%1a;2CH^snl=JvRmy301Kkms+jMZ07;0o97gPE<5gzbVCt7%F5%A{*zsxjIk5USr)9##c}2X(aJsB zV8?)epR*A*^o!_2#4@(Ta}{ZE{5^qbxHQ2R>9!TD$n;f6^Mx!dwX$!}1Ks11j0tl* z1?Ci7J?sXb+92I4Z8;E6Wjnfdx2ZfPUP)s@oIdu4;kQc$Pu?`%07>A}@q;7U;Otm! z9&5YoC#xhMtmn`fB3M=kc>CPv{w0{qypyNh?dv8rEah(O zwnXoLe*^WO*4;^6Rc-F1NoZ0QQlE!Ik7`l=3?lGWOl4NS^)A{6f?lm{X3sKj>y#H8 zKc-{)STn6)3N9=De2ffASdPRQKxH4?*=USd;lAlvdlsLbus9pmFMnmwbtrX2^Z!+A z=SmtNi_CQEU@Nlay){VL+HRp%_X9?qmet;ZYML!;xJ%u*qs$b$Np&{OjY`I9x~rBa zA3k7 zWLdQ?I*H+Ak?T2flC5&#l`-=#N}7N4X^dpvDNfW}q2*cFv`=+pMQ3Y}R?1=D!Es=(Av*`0Cg+6=BZ%9C*h8s+|LIX8A1)ROX7 zD}$9|@KTr5I3LK$e&yl9InirlLtsa~WU2hkHg&*v2~74o=UM}x{Fm{DNmpYe>(ssV z@};j@u%L~bI&RGn!=2r|r-$(ri9YHs3|Xy?r6H7?)$tMTel{~F$s#J@7^eu?;{qF%p>p2x(GQp5@eDSCwdo2&O#O;mmmQn1(M06a{=}vG zQrPE#UnF(aRhwc-?z%)4`-`h;^Jw-v=LZjcX^PJ!KvKTerRuh>jDc}=gvQL^;%lv% zye^%H$X8dy%!1I!t}o(om)c5(vt^T>yHurGb96sotyygB46orE%!7rtGbbXB@4;*x zAj#_GniNG>MFXEeYqs^A=N4}mL8bIx+^B*b_-JOg?q;7XkV?7}xL-LHiDW(L!;bSd zxUht*a-_BKm1|Vet#OAY-zo`1GM|6^P6BKE#Z8{&C2m+G4eZrCjdSC(P`AjASHEs- zSw6`E^15*2b1e{HCt`c{XxAOP8wnX|1z+a0hlsVuv{EnFX4*Rm^pT^CC;n zkMnWZPavkU;Mz`7NR+lYOnAc_A2x7YhdHAP55fSqu(Ah+SEezMb#G4Dza4*OrQ{At zV4-aGebF&e-EWLb$8n9~moo{34A!xu3=_)|iC+5&zlNV${!Uu;cFZFjQ=g7ZJu%6R z60DOOWaWcTAaCETwYx7`>^HBYk~!&3D(r_CGI$UZH4v9KGv6W`%^H-EYk_s z^vKk2@1;u6$GvSnXvVk&!w(Xv;EFO*$i>0DPvzswI$k|`jHubJM14*L-?yFI!^i25 z520(C)F+0KwNpjdFitV&R9xM$;5Ykb-_Jq+8>W^|>d&{YNga8P9P{Jk3Bttp6O#wW zF7d`sD`Nd!<(a1pu0aE>LCLL6Q=T)VmNo~i{YPa zPM!1!9J7>p@*$8DHTxRAung=;K0Tb%czd6!JO&wzI{quE1w41TzE9zIcPSHZq0sOJh;{YD5u#r)6pF%w zJ$zx-RcLl?R#w_u8MIy#LEbLpoz=`XuaQ2R*~|FsrpR=y1~E&6&DQmzB{h(wI_V71 zq>H8`E!S4v*nz`GJnz?WznVuI)oy9wx6=tF5)Z%aFN(b;^P^h#S{vQuwvGH2FF-Ph z{%SgRO(M_+{E70KCewg9@y#d;%FWCmVW2Rl3Y{xu%( z!o9Z5VV4rC%7miP0OHS1oFt9<<;Zhji7d=cXA%6IlrQJ6hsjc&voT7rP}$Wuha+Ic zDNS>}azePoL7sEC_5P8|W)xoOw~Ac{FFN zWHPc>4a;mt(JWo7wGo4(^%kb5lmz8#`Bxh@~0rq z!M~`FWn8e&uN%Gg5u7$lKHmij8O73@63#dR8t~H@u=UD1%fIfMJ?4>%SSa5)k%|e0 z5~b*a1*FHKt{(JyJ@np4=cB_lTm4x{H)l+DQ4I`-bi8L%VwMD}qClq!bwgcN9*P5B zcTmm{*?U{fgg9-k6dNUZJ?a7ud3tR&T)g4OUU*zm6Yj09P$!=iT3&;C@)XGhecCK* z3Bv-TQ%}O}Nt^FZSkMzG7Ek{w}5a(8_-#O5$AI)=)X?~03%q~?GD81JX zKJ|Vik#KMs8M>S!Hc+%#yLsST(}7;wqZ}f@8&1U$l#Sx4`3l@-U1vWZGU%x%m4P=+ zprj*?4GOnB;4R}^HcogJl~jEs>o|JyzH-fPH0KB7Bhc4A@uc_6|4zRP52hozT481w z?&9t0H-T~~Ap+*3tW5zh#Ugx?(+tn5>xAD4hu4!%5r zDHsa`Hh7dDnSf#r))cNOcmu>A-zEW$DexmGM`-s?ufA`;!E})sFcD-XH}Sk*JT@~n zQ}UQWZ`^yp)z1j$@8ExH0l09IQ+BSfD>?!L1A7Dq15^B8wg6b!{rheII{(%bK=%Iq3E8%cQCDvGs5D@ zQ?Vk{4#58Y#tVhoXfEP$Fb+0{u5hindulA=@{vil_0@BXa7fA z|Ct=!Kgb#UPvngB&8+_m&A$!BCo~OpOl1TEgW&=LllcFdU|F;w5{(DgW*rxRNP$EwMOQ?UVPyeiG>A!~ij|KE!;6AF`$8#lK_`ieu jC-42gl+zEe|D=@4OF=??=z_s~90nhm!JS2a`St$*0j= delay); + assert!(now.elapsed() < delay * 2); assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); } } diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 7b2788b..2185f93 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -4,6 +4,7 @@ use std::{any::TypeId, collections::HashMap}; use async_trait::async_trait; use futures::StreamExt; use futures_util::FutureExt; +use lapin::types::AMQPValue; pub use lapin::{ acker::Acker as LapinAcker, options::{ @@ -18,6 +19,7 @@ use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -89,13 +91,13 @@ async fn producer( #[async_trait] impl QueueBackend for RabbitMqBackend { - type Config = RabbitMqConfig; - type PayloadIn = Vec; + type PayloadOut = Vec; + type Producer = RabbitMqProducer; type Consumer = RabbitMqConsumer; - type Producer = RabbitMqProducer; + type Config = RabbitMqConfig; async fn new_pair( cfg: RabbitMqConfig, @@ -168,6 +170,36 @@ impl QueueProducer for RabbitMqProducer { } } +#[async_trait] +impl ScheduledProducer for RabbitMqProducer { + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let mut headers = FieldTable::default(); + + let delay_ms: u32 = delay + .as_millis() + .try_into() + .map_err(|_| QueueError::Generic("delay is too large".into()))?; + headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms)); + + self.channel + .basic_publish( + &self.exchange, + &self.routing_key, + self.options, + payload, + self.properties.clone().with_headers(headers), + ) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + pub struct RabbitMqConsumer { registry: DecoderRegistry>, consumer: Consumer, diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 7762060..501610e 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -1,3 +1,29 @@ +//! Redis stream-based queue implementation +//! +//! # Redis Streams in Brief +//! Redis has a built-in queue called streams. With consumer groups and consumers, messages in this +//! queue will automatically be put into a pending queue when read and deleted when acknowledged. +//! +//! # The Implementation +//! This implementation uses this to allow worker instances to race for messages to dispatch which +//! are then, ideally, acknowledged. If a message is processing for more than 45 seconds, it is +//! reinserted at the back of the queue to be tried again. +//! +//! This implementation uses the following data structures: +//! - A "tasks to be processed" stream - which is what the consumer listens to for tasks. +//! AKA: Main +//! - A ZSET for delayed tasks with the sort order being the time-to-be-delivered +//! AKA: Delayed +//! +//! The implementation spawns an additional worker that monitors both the zset delayed tasks and +//! the tasks currently processing. It monitors the zset task set for tasks that should be +//! processed now, and the currently processing queue for tasks that have timed out and should be +//! put back on the main queue. + +// This lint warns on `let _: () = ...` which is used throughout this file for Redis commands which +// have generic return types. This is cleaner than the turbofish operator in my opinion. +#![allow(clippy::let_unit_value)] + use std::time::Duration; use std::{any::TypeId, collections::HashMap, marker::PhantomData}; @@ -5,11 +31,14 @@ use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisMultiplexedConnectionManager; use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; +use svix_ksuid::KsuidLike; +use tokio::task::JoinHandle; use crate::{ decoding::DecoderRegistry, encoding::{CustomEncoder, EncoderRegistry}, queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + scheduled::ScheduledProducer, QueueError, }; @@ -42,6 +71,7 @@ pub struct RedisConfig { pub max_connections: u16, pub reinsert_on_nack: bool, pub queue_key: String, + pub delayed_queue_key: String, pub consumer_group: String, pub consumer_name: String, pub payload_key: String, @@ -50,6 +80,8 @@ pub struct RedisConfig { pub struct RedisQueueBackend(PhantomData); pub type RedisClusterQueueBackend = RedisQueueBackend; +type RawPayload = Vec; + #[async_trait] impl QueueBackend for RedisQueueBackend where @@ -57,14 +89,14 @@ where R::Connection: redis::aio::ConnectionLike + Send + Sync, R::Error: 'static + std::error::Error + Send + Sync, { - type Config = RedisConfig; - // FIXME: Is it possible to use the types Redis actually uses? - type PayloadIn = Vec; - type PayloadOut = Vec; + type PayloadIn = RawPayload; + type PayloadOut = RawPayload; type Producer = RedisStreamProducer; + type Consumer = RedisStreamConsumer; + type Config = RedisConfig; async fn new_pair( cfg: RedisConfig, @@ -78,11 +110,20 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; + Ok(( RedisStreamProducer { registry: custom_encoders, redis: redis.clone(), queue_key: cfg.queue_key.clone(), + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key.clone(), }, RedisStreamConsumer { @@ -107,10 +148,18 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamProducer { registry: custom_encoders, redis, queue_key: cfg.queue_key, + delayed_queue_key: cfg.delayed_queue_key, payload_key: cfg.payload_key, }) } @@ -126,6 +175,13 @@ where .await .map_err(QueueError::generic)?; + let _ = start_scheduler_background_task( + redis.clone(), + &cfg.queue_key, + &cfg.delayed_queue_key, + &cfg.payload_key, + ) + .await; Ok(RedisStreamConsumer { registry: custom_decoders, redis, @@ -137,6 +193,158 @@ where } } +// FIXME(onelson): there's a trait, [`SchedulerBackend`], but no obvious way to implement it in a +// way that makes good sense here. +// We need access to the pool, and various bits of config to spawn a task, but none of that is +// available where it matters right now. +// Doing my own thing for now - standalone function that takes what it needs. +async fn start_scheduler_background_task( + redis: bb8::Pool, + queue_key: &str, + delayed_queue_key: &str, + payload_key: &str, +) -> Option>> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + if delayed_queue_key.is_empty() { + tracing::warn!("no delayed_queue_key specified - delayed task scheduler disabled"); + return None; + } + + Some(tokio::spawn({ + let pool = redis.clone(); + let mqn = queue_key.to_string(); + let dqn = delayed_queue_key.to_string(); + let delayed_lock = format!("{delayed_queue_key}__lock"); + let payload_key = payload_key.to_string(); + tracing::debug!( + "spawning delayed task scheduler: delayed_queue_key=`{delayed_queue_key}`, \ + delayed_lock=`{delayed_lock}`" + ); + + async move { + loop { + if let Err(err) = background_task_delayed( + pool.clone(), + mqn.clone(), + dqn.clone(), + &delayed_lock, + &payload_key, + ) + .await + { + tracing::error!("{}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }; + } + } + })) +} + +/// Special ID for XADD command's which generates a stream ID automatically +const GENERATE_STREAM_ID: &str = "*"; +/// Special ID for XREADGROUP commands which reads any new messages +const LISTEN_STREAM_ID: &str = ">"; + +/// Moves "due" messages from a sorted set, where delayed messages are shelved, back onto the main queue. +async fn background_task_delayed( + pool: bb8::Pool, + main_queue_name: String, + delayed_queue_name: String, + delayed_lock: &str, + payload_key: &str, +) -> Result<(), QueueError> +where + R: RedisConnection, + R::Connection: redis::aio::ConnectionLike + Send + Sync, + R::Error: 'static + std::error::Error + Send + Sync, +{ + let batch_size: isize = 50; + + let mut conn = pool.get().await.map_err(QueueError::generic)?; + + // There is a lock on the delayed queue processing to avoid race conditions. So first try to + // acquire the lock should it not already exist. The lock expires after five seconds in case a + // worker crashes while holding the lock. + let mut cmd = redis::cmd("SET"); + cmd.arg(delayed_lock) + .arg(true) + .arg("NX") + .arg("PX") + .arg(5000); + // WIll be Some("OK") when set or None when not set + let resp: Option = cmd + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if resp.as_deref() == Some("OK") { + // First look for delayed keys whose time is up and add them to the main queue + let timestamp: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() + .map_err(QueueError::generic)?; + + let keys: Vec = redis::Cmd::zrangebyscore_limit( + &delayed_queue_name, + 0isize, + // Subtract 1 from the timestamp to make it exclusive rather than inclusive, + // preventing premature delivery. + timestamp - 1, + 0isize, + batch_size, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if !keys.is_empty() { + // For each task, XADD them to the MAIN queue + let mut pipe = redis::pipe(); + for key in &keys { + let task = from_delayed_queue_key(key)?; + let _ = pipe.xadd(&main_queue_name, GENERATE_STREAM_ID, &[(payload_key, task)]); + } + let _: () = pipe + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Then remove the tasks from the delayed queue so they aren't resent + let _: () = redis::Cmd::zrem(&delayed_queue_name, keys) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Make sure to release the lock after done processing + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + } else { + // Make sure to release the lock before sleeping + let _: () = redis::Cmd::del(delayed_lock) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + // Wait for half a second before attempting to fetch again if nothing was found + tokio::time::sleep(Duration::from_millis(500)).await; + } + } else { + // Also sleep half a second if hte lock could not be fetched + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} + pub struct RedisStreamAcker { redis: bb8::Pool, queue_key: String, @@ -184,6 +392,7 @@ pub struct RedisStreamProducer { registry: EncoderRegistry>, redis: bb8::Pool, queue_key: String, + delayed_queue_key: String, payload_key: String, } @@ -202,11 +411,78 @@ where async fn send_raw(&self, payload: &Vec) -> Result<(), QueueError> { let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - redis::Cmd::xadd(&self.queue_key, "*", &[(&self.payload_key, payload)]) - .query_async(&mut *conn) - .await + redis::Cmd::xadd( + &self.queue_key, + GENERATE_STREAM_ID, + &[(&self.payload_key, payload)], + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + Ok(()) + } +} + +/// Acts as a payload prefix for when payloads are written to zset keys. +/// +/// This ensures that messages with identical payloads: +/// - don't only get delivered once instead of N times. +/// - don't replace each other's "delivery due" timestamp. +fn delayed_key_id() -> String { + svix_ksuid::Ksuid::new(None, None).to_base62() +} + +/// Prefixes a payload with an id, separated by a pipe, e.g `ID|payload`. +fn to_delayed_queue_key(payload: &RawPayload) -> Result { + Ok(format!( + "{}|{}", + delayed_key_id(), + serde_json::to_string(payload).map_err(QueueError::generic)? + )) +} + +/// Returns the payload portion of a delayed zset key. +fn from_delayed_queue_key(key: &str) -> Result { + // All information is stored in the key in which the ID and JSON formatted task + // are separated by a `|`. So, take the key, then take the part after the `|`. + serde_json::from_str( + key.split('|') + .nth(1) + .ok_or_else(|| QueueError::Generic("Improper key format".into()))?, + ) + .map_err(QueueError::generic) +} + +#[async_trait] +impl ScheduledProducer for RedisStreamProducer +where + M: ManageConnection, + M::Connection: redis::aio::ConnectionLike + Send + Sync, + M::Error: 'static + std::error::Error + Send + Sync, +{ + async fn send_raw_scheduled( + &self, + payload: &Self::Payload, + delay: Duration, + ) -> Result<(), QueueError> { + let timestamp: i64 = (std::time::SystemTime::now() + delay) + .duration_since(std::time::UNIX_EPOCH) + .map_err(QueueError::generic)? + .as_secs() + .try_into() .map_err(QueueError::generic)?; + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; + redis::Cmd::zadd( + &self.delayed_queue_key, + to_delayed_queue_key(payload)?, + timestamp, + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + tracing::trace!("RedisQueue: event sent > (delay: {:?})", delay); Ok(()) } } @@ -260,7 +536,7 @@ where // Ensure an empty vec is never returned let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block(100_000) @@ -285,7 +561,7 @@ where let read_out: StreamReadReply = redis::Cmd::xread_options( &[&self.queue_key], - &[">"], + &[LISTEN_STREAM_ID], &StreamReadOptions::default() .group(&self.consumer_group, &self.consumer_name) .block( diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 5f79ea0..fc6abe5 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -221,7 +221,7 @@ impl ScheduledProducer for SqsQueueProducer { async fn send_raw_scheduled( &self, payload: &Self::Payload, - delay: std::time::Duration, + delay: Duration, ) -> Result<(), QueueError> { self.client .send_message() diff --git a/omniqueue/src/scheduled/mod.rs b/omniqueue/src/scheduled/mod.rs index cd0eed1..371fcc6 100644 --- a/omniqueue/src/scheduled/mod.rs +++ b/omniqueue/src/scheduled/mod.rs @@ -15,7 +15,7 @@ use crate::{ QueueError, QueuePayload, }; -// FIXME(onelson): unused? +// FIXME(onelson): only used by redis -- is this meant to be called internally or by the caller building the backend? #[async_trait] pub trait SchedulerBackend: QueueBackend { async fn start_scheduler_background_task( diff --git a/omniqueue/tests/rabbitmq.rs b/omniqueue/tests/rabbitmq.rs index 8a7efbe..d5e44f0 100644 --- a/omniqueue/tests/rabbitmq.rs +++ b/omniqueue/tests/rabbitmq.rs @@ -1,11 +1,14 @@ +use lapin::options::ExchangeDeclareOptions; +use lapin::types::AMQPValue; use lapin::{ options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}, types::FieldTable, - BasicProperties, Connection, ConnectionProperties, + BasicProperties, Connection, ConnectionProperties, ExchangeKind, }; use omniqueue::{ backends::rabbitmq::{RabbitMqBackend, RabbitMqConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -49,10 +52,39 @@ async fn make_test_queue( .await .unwrap(); + const DELAY_EXCHANGE: &str = "later-alligator"; + let mut args = FieldTable::default(); + args.insert( + "x-delayed-type".into(), + AMQPValue::LongString("direct".into()), + ); + channel + .exchange_declare( + DELAY_EXCHANGE, + ExchangeKind::Custom("x-delayed-message".to_string()), + ExchangeDeclareOptions { + auto_delete: true, + ..Default::default() + }, + args, + ) + .await + .unwrap(); + channel + .queue_bind( + &queue_name, + DELAY_EXCHANGE, + &queue_name, + Default::default(), + Default::default(), + ) + .await + .unwrap(); + let config = RabbitMqConfig { uri: MQ_URI.to_owned(), connection_properties: options, - publish_exchange: "".to_owned(), + publish_exchange: DELAY_EXCHANGE.to_string(), publish_routing_key: queue_name.clone(), publish_options: BasicPublishOptions::default(), publish_properties: BasicProperties::default(), @@ -272,3 +304,27 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue(None, false) + .await + .build_pair() + .await + .unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis.rs b/omniqueue/tests/redis.rs index a33edb5..8f5f50e 100644 --- a/omniqueue/tests/redis.rs +++ b/omniqueue/tests/redis.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisConfig, RedisQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; @@ -42,6 +43,7 @@ async fn make_test_queue() -> (QueueBuilder, RedisStr max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -243,3 +245,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/redis_cluster.rs b/omniqueue/tests/redis_cluster.rs index 0f0f0b4..24683cb 100644 --- a/omniqueue/tests/redis_cluster.rs +++ b/omniqueue/tests/redis_cluster.rs @@ -1,6 +1,7 @@ use omniqueue::{ backends::redis::{RedisClusterQueueBackend, RedisConfig}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; @@ -45,6 +46,7 @@ async fn make_test_queue() -> ( max_connections: 8, reinsert_on_nack: false, queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delay"), consumer_group: "test_cg".to_owned(), consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), @@ -246,3 +248,25 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + + let (builder, _drop) = make_test_queue().await; + let (p, mut c) = builder.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/omniqueue/tests/sqs.rs b/omniqueue/tests/sqs.rs index f696a4e..54987c0 100644 --- a/omniqueue/tests/sqs.rs +++ b/omniqueue/tests/sqs.rs @@ -2,6 +2,7 @@ use aws_sdk_sqs::Client; use omniqueue::{ backends::sqs::{SqsConfig, SqsQueueBackend}, queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, + scheduled::ScheduledProducer, }; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; @@ -223,3 +224,23 @@ async fn test_send_recv_all_late_arriving_items() { assert!(elapsed >= deadline); assert!(elapsed <= deadline + Duration::from_millis(200)); } + +#[tokio::test] +async fn test_scheduled() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + let delay = Duration::from_secs(3); + let now = Instant::now(); + p.send_serde_json_scheduled(&payload1, delay).await.unwrap(); + let delivery = c + .receive_all(1, delay * 2) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert!(now.elapsed() >= delay); + assert!(now.elapsed() < delay * 2); + assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap()); +} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index c8b8b6a..a7db678 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -1,10 +1,15 @@ version: "3.7" services: - mq: + rabbitmq: image: rabbitmq:3.11.11-management-alpine ports: - "5672:5672" - "15672:15672" + environment: + RABBITMQ_PLUGINS_DIR: "/opt/rabbitmq/plugins:/usr/lib/rabbitmq/plugins" + volumes: + - ./_rabbit/enabled_plugins:/etc/rabbitmq/enabled_plugins + - ./_rabbit/plugins:/usr/lib/rabbitmq/plugins elasticmq: # Drop-in SQS replacement image: softwaremill/elasticmq:1.3.14