src/main/scala/CouchStream.scala
author Tomas Zeman <tzeman@volny.cz>
Thu, 19 Nov 2015 19:08:18 +0100
changeset 4 b25c02bd6b11
parent 0 a279a342bc78
child 5 141e76d946e3
permissions -rw-r--r--
Reconnect feature
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     1
package couchdb.changes
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
     2
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     3
import akka.actor.{ActorRef, Actor, ActorLogging, Status}
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     4
import akka.io.IO
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     5
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     6
import scala.concurrent.ExecutionContext.Implicits.global
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     7
import scala.language.postfixOps
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     8
import scala.language.reflectiveCalls
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
     9
import scala.util.Try
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    10
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    11
import spray.can.Http
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    12
import spray.can.Http.ConnectionAttemptFailedException
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    13
import spray.client.pipelining._
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    14
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    15
import spray.http._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    16
import spray.json._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    17
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    18
case class ResultRow(seq: Int, id: String, doc: JsObject)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    19
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    20
trait ChangesMarshaller {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    21
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    22
  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    23
    import DefaultJsonProtocol._
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    24
    implicit val changesFormat = jsonFormat3(ResultRow)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    25
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    26
    def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    27
      Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    28
    }).getOrElse(Left(MalformedContent("bad json")))
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    29
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    30
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    31
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    32
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    33
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    34
  with ChangesMarshaller with settings {
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    35
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    36
  val io = IO(Http)(context.system)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    37
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    38
  def config = context.system.settings.config
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    39
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    40
  def receive: Receive = {
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    41
    case Start =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    42
      val uri = Uri(s"${couchConf.url}/_changes").withQuery(
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    43
          "feed" -> "continuous"
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    44
        , "heartbeat" -> couchConf.heartbeat.toMillis.toString
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    45
        , "include_docs" -> "true"
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    46
      )
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    47
      val rq = HttpRequest(HttpMethods.GET, uri = uri)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    48
      log.debug("Sending request {}", rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    49
      sendTo(io).withResponsesReceivedBy(self)(rq)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    50
    case x@ChunkedResponseStart(_) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    51
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    52
    case MessageChunk(data, _) =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    53
      log.debug("Received data chunk {}", data)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    54
      if (data.length > 2) // skip heartbeat messages
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    55
        ChangesMarshaller(data).fold(
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    56
          _ => log.warning("Failed to deserialize entity {}", data),
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    57
          processor !
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    58
        )
4
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    59
    case Status.Failure(t) =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    60
      log.error("Received failure {}", t)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    61
      t match {
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    62
        case x:ConnectionAttemptFailedException =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    63
          log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    64
          context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    65
            Start)
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    66
        case _ =>
b25c02bd6b11 Reconnect feature
Tomas Zeman <tzeman@volny.cz>
parents: 0
diff changeset
    67
      }
0
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    68
    case x =>
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    69
      log.debug("Received {}", x)
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    70
  }
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    71
}
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    72
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    73
a279a342bc78 Initial import
Tomas Zeman <tzeman@volny.cz>
parents:
diff changeset
    74
// vim: set ts=2 sw=2 et: