队列+多线程+couchbase缓存 ,解决高并发问题。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using GStreamCloud.Common;
using MyGStreamcloud.Common.Utils; namespace RedPacketWebAPI.Models
{
[Serializable]
public class Queueparam
{
public int uId { get; set; }
public string Name { get; set; }
} [Serializable]
public class TestResponse
{
public int Code{get;set;}
public string Message{get;set;}
} [Serializable]
public class TestQueue
{ public static Dictionary<string, TestResponse> output = new Dictionary<string, TestResponse>();
public static Thread t;
public TestQueue()
{
t = new Thread(new DoQueueProcess().ThreadDo);
} public static Queue<Queueparam> queuelist = new Queue<Queueparam>(); public static object lockqueue;
public static void Enqueue(Queueparam p)
{
lock (lockqueue)
{
queuelist.Enqueue(p);
}
}
public static Queueparam Dequeue()
{
lock (lockqueue)
{
if (queuelist.Count > )
{
return queuelist.Dequeue();
}
else
{
return null;
}
}
} public static TestResponse popValue(string key)
{
lock (lockqueue)
{
if (output.ContainsKey(key))
{
TestResponse resp = output[key];
output.Remove(key);
return resp;
}
else
{
return null;
}
}
} }
public class DoQueueProcess
{
public bool stop = false;
public void ThreadDo()
{
while (!stop)
{
try
{
Queueparam param=TestQueue.Dequeue();
if (param != null)
{
DoManageSomething todo = new DoManageSomething();
TestResponse resp= todo.Manage(param.Name);
TestQueue.output.Add(param.uId.ToString(), resp);
}
}
catch
{ }
try
{
Thread.Sleep();
}
catch
{ }
}
} } public class DoManageSomething
{
public TestResponse Manage(Queueparam cpara)
{
//TODO 具体处理业务
TestResponse resp = new TestResponse();
resp.Code = ;
resp.Message = cpara.Name + "处理成功";
disposeCoucheBase(cpara);
return resp;
}
public void disposeCoucheBase(Queueparam cpara)
{
ulong num = CouchBaseProvider.Decrement(Constants.TAG_CLAIMCOUNT, cpara.uId.ToString());
ulong userNum = CouchBaseProvider.Decrement(Constants.TAG_CLAIMCOUNT_USER, cpara.uId.ToString());
} } public class MonitorThread
{
public static async Task<TestResponse> WaitForReslut(string UID, int timeOut)
{
return await Task<TestResponse>.Run(() =>
{
TestResponse resp = new TestResponse();
Stopwatch sw = new Stopwatch();
sw.Start();
while (sw.ElapsedMilliseconds > timeOut)
{
try
{
if (TestQueue.output[UID] != null)
{
return TestQueue.popValue(UID);
}
else
{
Thread.Sleep();
}
}
catch
{
Thread.Sleep();
}
}
sw.Stop();
sw = null;
return resp;
});
}
}
}
调用处:
ulong c = CouchBaseProvider.Increment("TAG_CLAIMCOUNT_USER",userId.ToString(), , , new TimeSpan(, , ));//couchbase自增函数if (c > )//couchebase判断是否领取过
{
resp.Code = -;
resp.Message = "您已在队列里面"; }
else //couchebase判断是否领完
{
int nums = Convert.ToInt32(CouchBaseProvider.Increment(Constants.TAG_CLAIMCOUNT, userId.ToString()));
if (nums > Allnums)
{
resp.Code = -;
resp.Message = "人数已满"; }
else
{
//TODO 进入队列
Queueparam p= new Queueparam();
p.Name= userName;
p.UID = System.Guid.NewGuid().ToString();
p.redOrder = redOrder;
TestQueue.Enqueue(p);
int maxWait = ;
TestQueue queue = new TestQueue();
resp = await MonitorThread.WaitForReslut(p.UID, maxWait);
}
}