00001
00007 #ifdef WIN32
00008 # pragma warning(disable: 4127 4702) // C4127: conditional expression is constant, C4702: unreachable code
00009 # pragma comment(lib, "ws2_32.lib")
00010 # include <winsock2.h>
00011 # define GetLastSocketError() WSAGetLastError()
00012 # define EWOULDBLOCK WSAEWOULDBLOCK
00013 # define snprintf _snprintf
00014 # define SPRINTF_UINT64 "%I64u"
00015 # define STRTOUL64 _strtoui64
00016 #else
00017 # include <unistd.h>
00018 # include <sys/types.h>
00019 # include <sys/time.h>
00020 # include <sys/socket.h>
00021 # include <sys/ioctl.h>
00022 # include <arpa/inet.h>
00023 # include <netinet/in.h>
00024 # include <errno.h>
00025 # define SOCKET int
00026 # define INVALID_SOCKET -1
00027 # define SOCKET_ERROR -1
00028 # define closesocket close
00029 # define ioctlsocket ioctl
00030 # define GetLastSocketError() errno
00031 # ifndef EWOULDBLOCK
00032 # define EWOULDBLOCK EAGAIN
00033 # endif
00034 # define SPRINTF_UINT64 "%llu"
00035 # define STRTOUL64 strtoull
00036 #endif
00037
00039 #include <stdio.h>
00040 #include <string.h>
00041
00042 #include <vector>
00043 #include <algorithm>
00044 #include <cassert>
00045
00046 #include "MemCacheClient.h"
00047 #include "md5.h"
00048
00054 #define MEMCACHECLIENT_RECONNECT_SEC 60
00055
00057
00058
00059
00061 class ServerSocket
00062 {
00063 private:
00064 const static int MAXBUF = 1024;
00065
00066 SOCKET mSocket;
00067 char mBuf[MAXBUF];
00068 int mIdx;
00069 int mBufLen;
00070
00071 private:
00073 ServerSocket(const ServerSocket &);
00075 ServerSocket & operator=(const ServerSocket &);
00082 int ReceiveBytes(char * a_pszBuf, int a_nBufSiz);
00083
00084 public:
00086 class Exception : public std::exception {
00087 public:
00088 const char * mWhat;
00089
00092 Exception(const char * aWhat = "") { mWhat = aWhat; }
00093 };
00094
00095 public:
00097 ServerSocket();
00098
00100 ~ServerSocket();
00101
00107 bool Connect(unsigned long a_nIpAddress, int a_nPort, int a_nTimeout);
00108
00110 inline bool IsConnected() const { return mSocket != INVALID_SOCKET; }
00111
00113 void Disconnect();
00114
00124 void SendBytes(const char * a_pszBuf, size_t a_nBufSiz);
00125
00132 int GetBytes(char * a_pszBuf, int a_nBufSiz);
00133
00138 void DiscardBytes(int a_nBytes);
00139
00144 inline char GetByte() {
00145 if (mIdx >= mBufLen) {
00146 mIdx = 0;
00147 mBufLen = ReceiveBytes(mBuf, MAXBUF);
00148 }
00149 return mBuf[mIdx++];
00150 }
00151 };
00152
00153 ServerSocket::ServerSocket()
00154 : mSocket(INVALID_SOCKET)
00155 , mIdx(0)
00156 , mBufLen(0)
00157 { }
00158
00159 ServerSocket::~ServerSocket()
00160 {
00161 Disconnect();
00162 }
00163
00164 void
00165 ServerSocket::Disconnect()
00166 {
00167 if (mSocket == INVALID_SOCKET) {
00168 return;
00169 }
00170
00171
00172 shutdown(mSocket, 1);
00173
00174 int nTimeout = 10;
00175 setsockopt(mSocket, SOL_SOCKET, SO_RCVTIMEO,
00176 (const char*) &nTimeout, sizeof(nTimeout));
00177
00178
00179 int rc = 1;
00180 while (rc != SOCKET_ERROR && rc > 0) {
00181 rc = recv(mSocket, mBuf, MAXBUF, 0);
00182 }
00183
00184
00185 closesocket(mSocket);
00186 mSocket = INVALID_SOCKET;
00187
00188
00189 mIdx = mBufLen = 0;
00190 }
00191
00192 bool
00193 ServerSocket::Connect(
00194 unsigned long a_nIpAddress,
00195 int a_nPort,
00196 int a_nTimeout
00197 )
00198 {
00199 Disconnect();
00200
00201 struct sockaddr_in server;
00202 server.sin_family = AF_INET;
00203 server.sin_port = htons((short)a_nPort);
00204 server.sin_addr.s_addr = a_nIpAddress;
00205
00206 SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
00207 if (s == INVALID_SOCKET) return false;
00208
00209 try {
00210
00211 u_long value = 1;
00212 int rc = ioctlsocket(s, FIONBIO, &value);
00213 if (rc != 0) throw rc;
00214
00215 rc = connect(s, (struct sockaddr *) &server, sizeof(server));
00216 if (rc != 0) {
00217 if (rc != SOCKET_ERROR || GetLastSocketError() != EWOULDBLOCK) throw rc;
00218
00219
00220 struct timeval timeout;
00221 timeout.tv_sec = a_nTimeout / 1000;
00222 timeout.tv_usec = (a_nTimeout % 1000) * 1000;
00223 fd_set wr; FD_ZERO(&wr); FD_SET(s, &wr);
00224 fd_set ex; FD_ZERO(&ex); FD_SET(s, &ex);
00225 rc = select(0, NULL, &wr, &ex, &timeout);
00226 if (rc == 0 || rc == SOCKET_ERROR || FD_ISSET(s, &ex)) throw rc;
00227 }
00228
00229
00230 value = 0;
00231 rc = ioctlsocket(s, FIONBIO, &value);
00232 if (rc != 0) throw rc;
00233
00234 rc = setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
00235 (const char*) &a_nTimeout, sizeof(a_nTimeout));
00236 if (rc != 0) throw rc;
00237
00238 rc = setsockopt(s, SOL_SOCKET, SO_SNDTIMEO,
00239 (const char*) &a_nTimeout, sizeof(a_nTimeout));
00240 if (rc != 0) throw rc;
00241
00242 mSocket = s;
00243 return true;
00244 }
00245 catch (int) {
00246 closesocket(s);
00247 return false;
00248 }
00249 }
00250
00251 void
00252 ServerSocket::SendBytes(
00253 const char * a_pszBuf,
00254 size_t a_nBufSiz
00255 )
00256 {
00257
00258
00259 size_t n = send(mSocket, a_pszBuf, (int) a_nBufSiz, 0);
00260 if (n == a_nBufSiz) return;
00261
00262
00263 Disconnect();
00264 throw ServerSocket::Exception("send error");
00265 }
00266
00267 int
00268 ServerSocket::ReceiveBytes(
00269 char * a_pszBuf,
00270 int a_nBufSiz
00271 )
00272 {
00273 int n = recv(mSocket, a_pszBuf, a_nBufSiz, 0);
00274 if (n > 0) return n;
00275
00276
00277 Disconnect();
00278 throw ServerSocket::Exception("recv error");
00279 }
00280
00281 int
00282 ServerSocket::GetBytes(
00283 char * a_pszBuf,
00284 int a_nBufSiz
00285 )
00286 {
00287 if (mIdx < mBufLen) {
00288 int nLen = mBufLen - mIdx;
00289 if (nLen > a_nBufSiz) nLen = a_nBufSiz;
00290 memcpy(a_pszBuf, mBuf + mIdx, nLen);
00291 mIdx += nLen;
00292 if (mIdx == mBufLen) mIdx = mBufLen = 0;
00293 return nLen;
00294 }
00295 return ReceiveBytes(a_pszBuf, a_nBufSiz);
00296 }
00297
00298 void
00299 ServerSocket::DiscardBytes(
00300 int a_nBytes
00301 )
00302 {
00303 while (a_nBytes > 0) {
00304 if (mIdx == mBufLen) {
00305 mIdx = 0;
00306 mBufLen = ReceiveBytes(mBuf, MAXBUF);
00307 }
00308
00309 int nLen = mBufLen - mIdx;
00310 if (nLen > a_nBytes) {
00311 mIdx += a_nBytes;
00312 break;
00313 }
00314
00315 a_nBytes -= nLen;
00316 mIdx = mBufLen;
00317 }
00318 }
00319
00321
00323 class MemCacheClient::Server : public ServerSocket
00324 {
00326 const static size_t ADDRLEN = sizeof("aaa.bbb.ccc.ddd:PPPPP");
00327
00328 public:
00330 Server() : mIp(INADDR_NONE), mPort(0), mLastConnect(0) { mAddress[0] = 0; }
00331
00335 Server(const Server & rhs) { operator=(rhs); }
00336
00338 ~Server() { }
00339
00343 Server & operator=(const Server & rhs);
00344
00349 bool operator==(const Server & rhs) const;
00350
00355 inline bool operator!=(const Server & rhs) const { return !operator==(rhs); }
00356
00361 bool Set(const char * a_pszServer);
00362
00367 bool Connect(int a_nTimeout);
00368
00376 inline const char * GetAddress() const { return mAddress; }
00377
00378 private:
00379 char mAddress[ADDRLEN];
00380 unsigned long mIp;
00381 int mPort;
00382 time_t mLastConnect;
00383 };
00384
00385 MemCacheClient::Server &
00386 MemCacheClient::Server::operator=(
00387 const Server & rhs
00388 )
00389 {
00390 strcpy(mAddress, rhs.mAddress);
00391 mIp = rhs.mIp;
00392 mPort = rhs.mPort;
00393 mLastConnect = 0;
00394 return *this;
00395 }
00396
00397 bool
00398 MemCacheClient::Server::operator==(
00399 const Server & rhs
00400 ) const
00401 {
00402 return mIp == rhs.mIp && mPort == rhs.mPort;
00403 }
00404
00405 bool
00406 MemCacheClient::Server::Set(
00407 const char * a_pszServer
00408 )
00409 {
00410 if (!a_pszServer || !*a_pszServer) return false;
00411
00412 size_t nLen = strlen(a_pszServer);
00413 if (nLen >= ADDRLEN) return false;
00414 strcpy(mAddress, a_pszServer);
00415
00416 mPort = 11211;
00417 char * pszPort = strchr(mAddress, ':');
00418 if (pszPort) {
00419 mPort = atoi(pszPort + 1);
00420 *pszPort = 0;
00421 }
00422
00423 mIp = inet_addr(mAddress);
00424 if (mIp == INADDR_NONE) return false;
00425
00426 struct in_addr addr;
00427 addr.s_addr = mIp;
00428 snprintf(mAddress, ADDRLEN, "%s:%d", inet_ntoa(addr), mPort);
00429
00430 return true;
00431 }
00432
00433 bool
00434 MemCacheClient::Server::Connect(
00435 int a_nTimeout
00436 )
00437 {
00438
00439 if (IsConnected()) {
00440 return true;
00441 }
00442
00443
00444 time_t nNow;
00445 #ifdef WIN32
00446 nNow = GetTickCount();
00447 if (nNow - mLastConnect < MEMCACHECLIENT_RECONNECT_SEC * 1000) {
00448 #else
00449 time(&nNow);
00450 if (nNow - mLastConnect < MEMCACHECLIENT_RECONNECT_SEC) {
00451 #endif
00452 return false;
00453 }
00454 mLastConnect = nNow;
00455
00456 return ServerSocket::Connect(mIp, mPort, a_nTimeout);
00457 }
00458
00460
00461
00462 MemCacheClient::MemCacheClient()
00463 : m_nTimeoutMs(1000)
00464 {
00465 }
00466
00467 MemCacheClient::MemCacheClient(
00468 const MemCacheClient & rhs
00469 )
00470 {
00471 operator=(rhs);
00472 }
00473
00474 MemCacheClient &
00475 MemCacheClient::operator=(
00476 const MemCacheClient & rhs
00477 )
00478 {
00479 m_nTimeoutMs = rhs.m_nTimeoutMs;
00480 ClearServers();
00481 m_rgpServer.resize(rhs.m_rgpServer.size());
00482 for (size_t n = 0; n < rhs.m_rgpServer.size(); ++n) {
00483 m_rgpServer[n] = new Server(*rhs.m_rgpServer[n]);
00484 if (!m_rgpServer[n]) throw std::bad_alloc();
00485 }
00486 return *this;
00487 }
00488
00489 MemCacheClient::~MemCacheClient()
00490 {
00491 ClearServers();
00492 }
00493
00494 void
00495 MemCacheClient::ClearServers()
00496 {
00497 for (size_t n = 0; n < m_rgpServer.size(); ++n) {
00498 delete m_rgpServer[n];
00499 }
00500 m_rgpServer.clear();
00501 }
00502
00503 bool
00504 MemCacheClient::AddServer(
00505 const char * a_pszServer
00506 )
00507 {
00508
00509
00510
00511
00512 Server * pServer = new Server;
00513 if (!pServer->Set(a_pszServer)) {
00514 delete pServer;
00515 return false;
00516 }
00517 for (size_t n = 0; n < m_rgpServer.size(); ++n) {
00518 if (*pServer == *m_rgpServer[n]) return true;
00519 }
00520 m_rgpServer.push_back(pServer);
00521
00522
00523
00524
00525 static const char * rgpSalt[] = {
00526 "{DD4C855D-7548-4804-8F1A-166CDBACEFE7}",
00527 "{9BF02198-1D29-4aa3-9466-A4AF4372D5B1}",
00528 "{0F20CD2F-ACF2-44bc-8CE3-54529D7B738D}",
00529 "{DEA60AAB-CFF9-4a20-A799-4E5E93369656}",
00530 "{C05167CC-57DA-40f2-9EB8-18F65E56FD21}",
00531 "{57939537-0966-49e7-B675-ACE63246BFA5}",
00532 "{F0C8BE5C-A0F1-478f-BC45-28D42AF0CA1E}"
00533 };
00534
00535 string_t sKey;
00536 ConsistentHash entry(0, pServer);
00537 for (size_t n = 0; n < sizeof(rgpSalt)/sizeof(rgpSalt[0]); ++n) {
00538 sKey = rgpSalt[n];
00539 sKey += pServer->GetAddress();
00540 entry.mHash = CreateKeyHash(sKey.data());
00541 m_rgServerHash.push_back(entry);
00542 }
00543
00544
00545 std::sort(m_rgServerHash.begin(), m_rgServerHash.end());
00546
00547 #if 0
00548 printf("\nSERVER RING (%d servers):\n", m_rgpServer.size());
00549 for (size_t n = 0; n < m_rgServerHash.size(); ++n) {
00550 printf("%08x = %s\n", m_rgServerHash[n].mHash,
00551 m_rgServerHash[n].mServer->GetAddress());
00552 }
00553 #endif
00554
00555 return true;
00556 }
00557
00559 struct MemCacheClient::ConsistentHash::MatchServer
00560 {
00562 MemCacheClient::Server * mServer;
00566 MatchServer(MemCacheClient::Server * aServer) : mServer(aServer) { }
00570 bool operator()(const ConsistentHash & rhs) const { return rhs.mServer == mServer; }
00571 };
00572
00573 bool
00574 MemCacheClient::DelServer(
00575 const char * a_pszServer
00576 )
00577 {
00578 Server test;
00579 if (test.Set(a_pszServer)) {
00580 std::vector<Server*>::iterator i = m_rgpServer.begin();
00581 for (; i != m_rgpServer.end(); ++i) {
00582 Server * pServer = *i;
00583 if (test != *pServer) continue;
00584
00585 delete pServer;
00586 m_rgpServer.erase(i);
00587 ConsistentHash::MatchServer server(pServer);
00588 m_rgServerHash.erase(
00589 std::partition(m_rgServerHash.begin(), m_rgServerHash.end(), server),
00590 m_rgServerHash.end());
00591 std::sort(m_rgServerHash.begin(), m_rgServerHash.end());
00592 return true;
00593 }
00594 }
00595
00596
00597 return false;
00598 }
00599
00600 void
00601 MemCacheClient::GetServers(
00602 std::vector<string_t> & a_rgServers
00603 )
00604 {
00605 a_rgServers.clear();
00606 a_rgServers.reserve(m_rgpServer.size());
00607 for (size_t n = 0; n < m_rgpServer.size(); ++n) {
00608 a_rgServers.push_back(m_rgpServer[n]->GetAddress());
00609 }
00610 }
00611
00612 void
00613 MemCacheClient::SetTimeout(
00614 int a_nMilliseconds
00615 )
00616 {
00617 m_nTimeoutMs = a_nMilliseconds;
00618 }
00619
00620 unsigned long
00621 MemCacheClient::CreateKeyHash(
00622 const char * a_pszKey
00623 )
00624 {
00625 union {
00626 char as_char[16];
00627 unsigned long as_long[4];
00628 } output;
00629 assert(sizeof(output.as_char) == MD5_HASHSIZE);
00630 assert(sizeof(output.as_char) == sizeof(output.as_long));
00631
00632 md5(a_pszKey, (long) strlen(a_pszKey), output.as_char);
00633 return output.as_long[0];
00634 }
00635
00636 MemCacheClient::Server *
00637 MemCacheClient::FindServer(
00638 const string_t & a_sKey
00639 )
00640 {
00641
00642 if (m_rgServerHash.empty()) {
00643 return NULL;
00644 }
00645
00646
00647 ConsistentHash hash(CreateKeyHash(a_sKey.data()), NULL);
00648 std::vector<ConsistentHash>::iterator iServer =
00649 std::lower_bound(m_rgServerHash.begin(), m_rgServerHash.end(), hash);
00650 if (iServer == m_rgServerHash.end()) {
00651 iServer = m_rgServerHash.begin();
00652 }
00653
00654
00655 Server * pServer = iServer->mServer;
00656 if (!pServer->Connect(m_nTimeoutMs)) {
00657 return NULL;
00658 }
00659 return pServer;
00660 }
00661
00663 struct MemCacheClient::MemRequest::Sort
00664 {
00670 bool operator()(const MemRequest * pl, const MemRequest * pr) const {
00671 return pl->mServer < pr->mServer;
00672 }
00673 };
00674
00675 int
00676 MemCacheClient::Combine(
00677 const char * a_pszType,
00678 MemRequest * a_rgItem,
00679 int a_nCount
00680 )
00681 {
00682 MemRequest * rgpItem[MAX_REQUESTS] = { NULL };
00683 if (a_nCount > MAX_REQUESTS) return -1;
00684
00685
00686 int nItemCount = 0;
00687 for (int n = 0; n < a_nCount; ++n) {
00688 a_rgItem[n].mServer = FindServer(a_rgItem[n].mKey);
00689 if (a_rgItem[n].mServer) {
00690 rgpItem[nItemCount++] = &a_rgItem[n];
00691 }
00692 else {
00693 a_rgItem[n].mResult = MCERR_NOSERVER;
00694 }
00695 }
00696 if (nItemCount == 0) return 0;
00697
00698
00699 const static MemRequest::Sort sortOnServer = MemRequest::Sort();
00700 std::sort(&rgpItem[0], &rgpItem[nItemCount], sortOnServer);
00701
00702
00703 char szBuf[50];
00704 int nItem = 0, nNext;
00705 string_t sRequest, sTemp;
00706 while (nItem < nItemCount) {
00707 for (nNext = nItem; nNext < nItemCount; ++nNext) {
00708 if (rgpItem[nItem]->mServer != rgpItem[nNext]->mServer) break;
00709
00710
00711 if (*a_pszType == 'g') {
00712 if (nNext == nItem) sRequest = "get";
00713 else sRequest.resize(sRequest.length() - 2);
00714 sRequest += ' ';
00715 sRequest += rgpItem[nNext]->mKey;
00716 sRequest += "\r\n";
00717 rgpItem[nNext]->mResult = MCERR_NOTFOUND;
00718 }
00719
00720 else if (*a_pszType == 'd') {
00721
00722 sRequest += "delete ";
00723 sRequest += rgpItem[nNext]->mKey;
00724 sRequest += ' ';
00725 snprintf(szBuf, sizeof(szBuf), "%ld", (long) rgpItem[nNext]->mExpiry);
00726 sRequest += szBuf;
00727 if (rgpItem[nNext]->mResult == MCERR_NOREPLY) {
00728 sRequest += " noreply";
00729 }
00730 sRequest += "\r\n";
00731 if (rgpItem[nNext]->mResult != MCERR_NOREPLY) {
00732 rgpItem[nNext]->mResult = MCERR_NOTFOUND;
00733 }
00734 }
00735 }
00736
00737
00738
00739 try {
00740 rgpItem[nItem]->mServer->SendBytes(
00741 sRequest.data(), sRequest.length());
00742 }
00743 catch (const ServerSocket::Exception &) {
00744 for (int n = nItem; n < nNext; ++n) {
00745 rgpItem[n]->mServer = NULL;
00746 rgpItem[n]->mResult = MCERR_NOSERVER;
00747 }
00748 }
00749 nItem = nNext;
00750 }
00751
00752
00753 int nResponses = 0;
00754 for (nItem = 0; nItem < nItemCount; nItem = nNext) {
00755
00756 if (!rgpItem[nItem]->mServer) { nNext = nItem + 1; continue; }
00757 for (nNext = nItem + 1; nNext < nItemCount; ++nNext) {
00758 if (rgpItem[nItem]->mServer != rgpItem[nNext]->mServer) break;
00759 }
00760
00761
00762
00763 try {
00764 if (*a_pszType == 'g') {
00765 nResponses += HandleGetResponse(
00766 rgpItem[nItem]->mServer,
00767 &rgpItem[nItem], &rgpItem[nNext]);
00768 }
00769 else if (*a_pszType == 'd') {
00770 nResponses += HandleDelResponse(
00771 rgpItem[nItem]->mServer,
00772 &rgpItem[nItem], &rgpItem[nNext]);
00773 }
00774 }
00775 catch (const ServerSocket::Exception &) {
00776 rgpItem[nItem]->mServer->Disconnect();
00777 for (int n = nNext - 1; n >= nItem; --n) {
00778 if (rgpItem[nItem]->mServer != rgpItem[n]->mServer) continue;
00779 rgpItem[n]->mServer = NULL;
00780 rgpItem[n]->mResult = MCERR_NOSERVER;
00781 }
00782 }
00783 }
00784
00785 return nResponses;
00786 }
00787
00788 int
00789 MemCacheClient::HandleGetResponse(
00790 Server * a_pServer,
00791 MemRequest ** a_ppBegin,
00792 MemRequest ** a_ppEnd
00793 )
00794 {
00795 int nFound = 0;
00796
00797 string_t sValue;
00798 for (;;) {
00799
00800 sValue = a_pServer->GetByte();
00801 while (sValue[sValue.length()-1] != '\n') {
00802 sValue += a_pServer->GetByte();
00803 }
00804 if (sValue == "END\r\n") break;
00805
00806
00807 if (0 != strncmp(sValue.data(), "VALUE ", 6)) {
00808 throw ServerSocket::Exception("bad response");
00809 }
00810
00811
00812 int n = (int) sValue.find(' ', 6);
00813 if (n < 1) throw ServerSocket::Exception("bad response");
00814 string_t sKey(sValue, 6, n - 6);
00815
00816
00817 char * pVal = const_cast<char*>(sValue.data() + n + 1);
00818 unsigned nFlags = (unsigned) strtoul(pVal, &pVal, 10);
00819 if (*pVal++ != ' ') throw ServerSocket::Exception("bad response");
00820
00821
00822 unsigned nBytes = (unsigned) strtoul(pVal, &pVal, 10);
00823 if (*pVal != ' ' && *pVal != '\r') throw ServerSocket::Exception("bad response");
00824
00825
00826 MemRequest * pItem = NULL;
00827 for (MemRequest ** p = a_ppBegin; p < a_ppEnd; ++p) {
00828 if ((*p)->mKey == sKey) { pItem = *p; break; }
00829 }
00830 if (!pItem) {
00831 a_pServer->DiscardBytes(nBytes + 2);
00832 continue;
00833 }
00834 pItem->mFlags = nFlags;
00835
00836
00837 if (*pVal == ' ') {
00838 pItem->mCas = STRTOUL64(++pVal, &pVal, 10);
00839 if (*pVal != '\r') throw ServerSocket::Exception("bad response");
00840 }
00841
00842
00843 while (nBytes > 0) {
00844 char * pBuf = pItem->mData.GetWriteBuffer(nBytes);
00845 int nReceived = a_pServer->GetBytes(pBuf, nBytes);
00846 pItem->mData.CommitWriteBytes(nReceived);
00847 nBytes -= nReceived;
00848 }
00849 pItem->mResult = MCERR_OK;
00850
00851
00852 if ('\r' != a_pServer->GetByte() ||
00853 '\n' != a_pServer->GetByte())
00854 {
00855 throw ServerSocket::Exception("bad response");
00856 }
00857
00858 ++nFound;
00859 }
00860
00861 return nFound;
00862 }
00863
00864 int
00865 MemCacheClient::HandleDelResponse(
00866 Server * a_pServer,
00867 MemRequest ** a_ppBegin,
00868 MemRequest ** a_ppEnd
00869 )
00870 {
00871 string_t sValue;
00872 int nResponses = 0;
00873 for (MemRequest ** p = a_ppBegin; p < a_ppEnd; ++p) {
00874 MemRequest * pItem = *p;
00875
00876
00877 if (pItem->mResult == MCERR_NOREPLY) continue;
00878
00879
00880 sValue = a_pServer->GetByte();
00881 while (sValue[sValue.length()-1] != '\n') {
00882 sValue += a_pServer->GetByte();
00883 }
00884
00885
00886 if (sValue == "DELETED\r\n") {
00887 pItem->mResult = MCERR_OK;
00888 ++nResponses;
00889 continue;
00890 }
00891
00892
00893 if (sValue == "NOT_FOUND\r\n") {
00894 pItem->mResult = MCERR_NOTFOUND;
00895 ++nResponses;
00896 continue;
00897 }
00898
00899 a_pServer->Disconnect();
00900 throw ServerSocket::Exception("bad response");
00901 }
00902
00903 return nResponses;
00904 }
00905
00906 MCResult
00907 MemCacheClient::IncDec(
00908 const char * a_pszType,
00909 const char * a_pszKey,
00910 uint64_t * a_pnNewValue,
00911 uint64_t a_nDiff,
00912 bool a_bWantReply
00913 )
00914 {
00915 Server * pServer = FindServer(a_pszKey);
00916 if (!pServer) return MCERR_NOSERVER;
00917
00918 char szBuf[50];
00919 string_t sRequest(a_pszType);
00920 sRequest += ' ';
00921 sRequest += a_pszKey;
00922 snprintf(szBuf, sizeof(szBuf), " " SPRINTF_UINT64, a_nDiff);
00923 sRequest += szBuf;
00924 if (!a_bWantReply) {
00925 sRequest += " noreply";
00926 }
00927 sRequest += "\r\n";
00928
00929 try {
00930 pServer->SendBytes(sRequest.data(), sRequest.length());
00931
00932 if (!a_bWantReply) {
00933 return MCERR_NOREPLY;
00934 }
00935
00936 string_t sValue;
00937 sValue = pServer->GetByte();
00938 while (sValue[sValue.length()-1] != '\n') {
00939 sValue += pServer->GetByte();
00940 }
00941
00942 if (sValue == "NOT_FOUND\r\n") {
00943 return MCERR_NOTFOUND;
00944 }
00945
00946 if (a_pnNewValue) {
00947 *a_pnNewValue = STRTOUL64(sValue.data(), NULL, 10);
00948 }
00949 return MCERR_OK;
00950 }
00951 catch (const ServerSocket::Exception &) {
00952 pServer->Disconnect();
00953 return MCERR_NOSERVER;
00954 }
00955 }
00956
00957 int
00958 MemCacheClient::Store(
00959 const char * a_pszType,
00960 MemRequest * a_rgItem,
00961 int a_nCount
00962 )
00963 {
00964
00965
00966
00967 for (int n = 0; n < a_nCount; ++n) {
00968 a_rgItem[n].mServer = FindServer(a_rgItem[n].mKey);
00969 if (!a_rgItem[n].mServer) {
00970 a_rgItem[n].mResult = MCERR_NOSERVER;
00971 }
00972 }
00973
00974 char szBuf[50];
00975 int nResponses = 0;
00976 string_t sRequest;
00977 for (int n = 0; n < a_nCount; ++n) {
00978 if (!a_rgItem[n].mServer) continue;
00979
00980
00981 sRequest = a_pszType;
00982 sRequest += ' ';
00983 sRequest += a_rgItem[n].mKey;
00984 snprintf(szBuf, sizeof(szBuf), " %u %ld %u",
00985 a_rgItem[n].mFlags, (long) a_rgItem[n].mExpiry,
00986 a_rgItem[n].mData.GetReadSize());
00987 sRequest += szBuf;
00988 if (*a_pszType == 'c') {
00989 snprintf(szBuf, sizeof(szBuf), " " SPRINTF_UINT64, a_rgItem[n].mCas);
00990 sRequest += szBuf;
00991 }
00992 if (a_rgItem[n].mResult == MCERR_NOREPLY) {
00993 sRequest += " noreply";
00994 }
00995 sRequest += "\r\n";
00996
00997
00998
00999 try {
01000 a_rgItem[n].mServer->SendBytes(
01001 sRequest.data(), sRequest.length());
01002 a_rgItem[n].mServer->SendBytes(
01003 a_rgItem[n].mData.GetReadBuffer(),
01004 a_rgItem[n].mData.GetReadSize());
01005 a_rgItem[n].mServer->SendBytes("\r\n", 2);
01006
01007
01008 a_rgItem[n].mData.CommitReadBytes(
01009 a_rgItem[n].mData.GetReadSize());
01010
01011
01012 if (a_rgItem[n].mResult == MCERR_NOREPLY) {
01013 continue;
01014 }
01015
01016
01017 HandleStoreResponse(a_rgItem[n].mServer, a_rgItem[n]);
01018 ++nResponses;
01019 }
01020 catch (const ServerSocket::Exception &) {
01021 for (int i = a_nCount - 1; i >= n; --i) {
01022 if (a_rgItem[n].mServer != a_rgItem[i].mServer) continue;
01023 a_rgItem[i].mServer = NULL;
01024 a_rgItem[i].mResult = MCERR_NOSERVER;
01025 }
01026 continue;
01027 }
01028 }
01029
01030 return nResponses;
01031 }
01032
01033 void
01034 MemCacheClient::HandleStoreResponse(
01035 Server * a_pServer,
01036 MemRequest & a_oItem
01037 )
01038 {
01039
01040 string_t sValue;
01041 sValue = a_pServer->GetByte();
01042 while (sValue[sValue.length()-1] != '\n') {
01043 sValue += a_pServer->GetByte();
01044 }
01045
01046
01047 if (sValue == "STORED\r\n") {
01048 a_oItem.mResult = MCERR_OK;
01049 return;
01050 }
01051
01052
01053
01054
01055
01056 if (sValue == "NOT_STORED\r\n") {
01057 a_oItem.mResult = MCERR_NOTSTORED;
01058 return;
01059 }
01060
01061
01062 a_pServer->Disconnect();
01063 throw ServerSocket::Exception("bad response");
01064 }
01065
01066 int
01067 MemCacheClient::FlushAll(
01068 const char * a_pszServer,
01069 int a_nExpiry
01070 )
01071 {
01072 char szRequest[50];
01073 snprintf(szRequest, sizeof(szRequest),
01074 "flush_all %u\r\n", a_nExpiry);
01075
01076 Server test;
01077 if (a_pszServer && !test.Set(a_pszServer)) return false;
01078
01079 int nSuccess = 0;
01080 for (size_t n = 0; n < m_rgpServer.size(); ++n) {
01081 Server * pServer = m_rgpServer[n];
01082 if (a_pszServer && *pServer != test) continue;
01083
01084
01085 if (!pServer->Connect(m_nTimeoutMs)) {
01086 continue;
01087 }
01088
01089 try {
01090
01091 pServer->SendBytes(szRequest, strlen(szRequest));
01092
01093
01094 string_t sValue;
01095 sValue = pServer->GetByte();
01096 while (sValue[sValue.length()-1] != '\n') {
01097 sValue += pServer->GetByte();
01098 }
01099 if (sValue == "OK\r\n") {
01100
01101 ++nSuccess;
01102 }
01103 else {
01104
01105 pServer->Disconnect();
01106 }
01107 }
01108 catch (const ServerSocket::Exception &) {
01109
01110 }
01111 }
01112
01113 return nSuccess;
01114 }