def synchronizedTransaction[A]()

in mysql-client/src/main/scala/com/twitter/finatra/mysql/util/DAOLocks.scala [60:136]


  def synchronizedTransaction[A](
    mysqlClient: Client with Transactions,
    advisoryLockName: String,
    timeoutSeconds: Option[Int] = None,
    lockTimingStat: Stat = NullStatsReceiver.stat("nullStat")
  )(
    f: Client => Future[A]
  ): Future[A] = {

    def discardSession(sessionClient: Client with Transactions with Session): Future[Unit] = {
      sessionClient.discard()
    }

    def executeTransactionInSession(
      sessionClient: Client with Transactions with Session
    ): Future[A] = {
      sessionClient.transaction { sessionInTransaction =>
        f(sessionInTransaction)
          .onFailure(ex => error("Failed executing f() within transaction", ex))
      }
    }

    def releaseLock(sessionClient: Client with Transactions with Session): Future[Unit] = {
      releaseAdvisoryLock(sessionClient, advisoryLockName)
        .rescue {
          case NonFatal(ex) =>
            // If releasing the lock fails, that is 'ok' because the inner command succeeded so we can
            // return the result of the operation we actually care about, and because the release lock
            // failed, we'll discard the session to ensure everything is cleaned up with respect to the
            // lock.
            error(s"Failed releasing lock $advisoryLockName, discarding session", ex)
            discardSession(sessionClient)
        }
    }

    def executeWithinLock(
      sessionClient: Client with Transactions with Session
    )(
      opInLock: Client with Transactions with Session => Future[A]
    ): Future[A] = {
      debug(s"Attempting to get advisory lock [$advisoryLockName]")

      val resultFuture = for {
        _ <- Stat.timeFuture(lockTimingStat)(
          getAdvisoryLock(sessionClient, advisoryLockName, timeoutSeconds))
        result <- opInLock(sessionClient)
        _ <- releaseLock(sessionClient)
      } yield result

      resultFuture.rescue {
        case AdvisoryLockException =>
          // Failed to acquire the original lock, just log and propagate the exception
          debug(s"Failed to acquire advisory lock [$advisoryLockName]")
          Future.exception(AdvisoryLockException)
        case ex: ServerError =>
          error(s"MySql command failed", ex)
          releaseLock(sessionClient)
            .transform(_ => Future.exception(ex))
        case NonFatal(ex) =>
          // Either failed because:
          // - Failed to acquire lock due to network issue
          // - Failed to execute transaction/query due to network issue

          error("Failed due to non-MySql server error", ex)
          // Try and release the lock, and then discard the session regardless
          releaseLock(sessionClient)
            .flatMap(_ => discardSession(sessionClient))
            .transform(_ => Future.exception(ex))
      }
    }

    mysqlClient.session { sessionClient =>
      executeWithinLock(sessionClient) { sessionClientInLock =>
        executeTransactionInSession(sessionClientInLock)
      }
    }
  }