| author | Tomas Zeman <tzeman@volny.cz> |
| Thu, 05 May 2016 16:26:07 +0200 | |
| changeset 5 | 141e76d946e3 |
| parent 4 | b25c02bd6b11 |
| permissions | -rw-r--r-- |
| 0 | 1 |
package couchdb.changes |
2 |
||
| 4 | 3 |
import akka.actor.{ActorRef, Actor, ActorLogging, Status}
|
4 |
import akka.io.IO |
|
5 |
||
6 |
import scala.concurrent.ExecutionContext.Implicits.global |
|
7 |
import scala.language.postfixOps |
|
8 |
import scala.language.reflectiveCalls |
|
9 |
import scala.util.Try |
|
10 |
||
11 |
import spray.can.Http |
|
12 |
import spray.can.Http.ConnectionAttemptFailedException |
|
13 |
import spray.client.pipelining._ |
|
| 0 | 14 |
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
|
15 |
import spray.http._ |
|
16 |
import spray.json._ |
|
17 |
||
18 |
case class ResultRow(seq: Int, id: String, doc: JsObject) |
|
19 |
||
20 |
trait ChangesMarshaller {
|
|
21 |
||
22 |
implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
|
|
23 |
import DefaultJsonProtocol._ |
|
24 |
implicit val changesFormat = jsonFormat3(ResultRow) |
|
25 |
||
|
5
141e76d946e3
CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents:
4
diff
changeset
|
26 |
def apply(entity: HttpEntity): Deserialized[ResultRow] = Try {
|
|
141e76d946e3
CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents:
4
diff
changeset
|
27 |
Right(JsonParser(entity.asString(HttpCharsets.`UTF-8`)).asJsObject. |
|
141e76d946e3
CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents:
4
diff
changeset
|
28 |
convertTo[ResultRow]) |
|
141e76d946e3
CouchStream: process entity as UTF-8
Tomas Zeman <tzeman@volny.cz>
parents:
4
diff
changeset
|
29 |
} getOrElse Left(MalformedContent("bad json"))
|
| 0 | 30 |
|
31 |
} |
|
32 |
} |
|
33 |
||
34 |
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging |
|
| 4 | 35 |
with ChangesMarshaller with settings {
|
| 0 | 36 |
|
37 |
val io = IO(Http)(context.system) |
|
38 |
||
| 4 | 39 |
def config = context.system.settings.config |
40 |
||
| 0 | 41 |
def receive: Receive = {
|
| 4 | 42 |
case Start => |
43 |
val uri = Uri(s"${couchConf.url}/_changes").withQuery(
|
|
44 |
"feed" -> "continuous" |
|
45 |
, "heartbeat" -> couchConf.heartbeat.toMillis.toString |
|
46 |
, "include_docs" -> "true" |
|
47 |
) |
|
| 0 | 48 |
val rq = HttpRequest(HttpMethods.GET, uri = uri) |
49 |
log.debug("Sending request {}", rq)
|
|
50 |
sendTo(io).withResponsesReceivedBy(self)(rq) |
|
51 |
case x@ChunkedResponseStart(_) => |
|
52 |
log.debug("Received {}", x)
|
|
53 |
case MessageChunk(data, _) => |
|
54 |
log.debug("Received data chunk {}", data)
|
|
55 |
if (data.length > 2) // skip heartbeat messages |
|
56 |
ChangesMarshaller(data).fold( |
|
57 |
_ => log.warning("Failed to deserialize entity {}", data),
|
|
58 |
processor ! |
|
59 |
) |
|
| 4 | 60 |
case Status.Failure(t) => |
61 |
log.error("Received failure {}", t)
|
|
62 |
t match {
|
|
63 |
case x:ConnectionAttemptFailedException => |
|
64 |
log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
|
|
65 |
context.system.scheduler.scheduleOnce(couchConf.reconnect, self, |
|
66 |
Start) |
|
67 |
case _ => |
|
68 |
} |
|
| 0 | 69 |
case x => |
70 |
log.debug("Received {}", x)
|
|
71 |
} |
|
72 |
} |
|
73 |
||
74 |
||
75 |
// vim: set ts=2 sw=2 et: |