我正在研究.net核心Web应用程序。我想听我的PostgreSQL数据库。如果 table 上有任何变化,我必须得到。
因此,根据我的研究,我必须使用SignalR Core。我用SignalR做过一些示例应用程序,例如聊天应用程序,但它们都没有监听数据库。我找不到任何示例。
-是否必须在PostgreSQL数据库上触发?
-在代码方面必须是监听器吗?
-如何使用SignalR Core?
请告诉我一种方法。
非常感谢。

请您参考如下方法:

这个例子是asp.net core 3.0+的工作版本。完整代码如下。

步骤1.在PostgreSql上创建一个触发器来监听 Action

 create trigger any_after_alarm_speed after 
 insert 
 or 
 delete 
 or 
 update 
 on 
 public.alarm_speed for each row execute procedure alarm_speedf(); 

第2步。在Postgresql上创建Procedur
CREATE OR REPLACE FUNCTION public.alarm_speedf() 
RETURNS trigger 
LANGUAGE plpgsql 
AS $function$ 
BEGIN 
IF TG_OP = 'INSERT' then 
PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id,  
NEW.alarm_speed_date)); 
ELSIF TG_OP = 'UPDATE' then 
PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id,  
OLD.alarm_speed_date)); 
ELSIF TG_OP = 'DELETE' then 
PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id,  
OLD.alarm_speed_date)); 
END IF; 
RETURN NULL; 
END; 
$function$; 

步骤3.创建集线器
  public class speedalarmhub : Hub 
    { 
 
        private IMemoryCache _cache; 
       `private IHubContext<speedalarmhub> _hubContext; 
         public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext) 
        { 
            _cache = cache; 
            _hubContext = hubContext;  
        } 
 
        public async Task SendMessage() 
        { 
            if (!_cache.TryGetValue("SpeedAlarm", out string response)) 
            { 
                SpeedListener speedlist = new SpeedListener(_hubContext,_cache); 
                speedlist.ListenForAlarmNotifications(); 
                string jsonspeedalarm = speedlist.GetAlarmList(); 
                _cache.Set("SpeedAlarm", jsonspeedalarm); 
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString()); 
            } 
            else 
            { 
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString()); 
            } 
        } 
 
    } 

步骤4.创建监听器 Controller
 public class SpeedListener :Controller 
{ 
    private IHubContext<speedalarmhub> _hubContext; 
    private IMemoryCache _cache; 
    public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache) 
    { 
        _hubContext = hubContext; 
        _cache = cache;  
    } 
    static string GetConnectionString() 
    { 
        var csb = new NpgsqlConnectionStringBuilder 
        { 
            Host = "yourip", 
            Database = "yourdatabase", 
            Username = "yourusername", 
            Password = "yourpassword", 
            Port = 5432, 
            KeepAlive = 30 
        }; 
        return csb.ConnectionString; 
    } 
    public void ListenForAlarmNotifications() 
    { 
        NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString()); 
        conn.StateChange += conn_StateChange; 
        conn.Open(); 
        var listenCommand = conn.CreateCommand(); 
        listenCommand.CommandText = $"listen notifyalarmspeed;"; 
        listenCommand.ExecuteNonQuery(); 
        conn.Notification += PostgresNotificationReceived; 
        _hubContext.Clients.All.SendAsync(this.GetAlarmList()); 
        while (true) 
        { 
            conn.Wait(); 
        } 
    } 
    private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e) 
    { 
 
        string actionName = e.Payload.ToString(); 
        string actionType = ""; 
        if (actionName.Contains("DELETE")) 
        { 
            actionType = "Delete"; 
        } 
        if (actionName.Contains("UPDATE")) 
        { 
            actionType = "Update"; 
        } 
        if (actionName.Contains("INSERT")) 
        { 
            actionType = "Insert"; 
        } 
        _hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList()); 
    } 
    public string GetAlarmList() 
    { 
        var AlarmList = new List<AlarmSpeedViewModel>(); 
        using (NpgsqlCommand sqlCmd = new NpgsqlCommand()) 
        { 
            sqlCmd.CommandType = CommandType.StoredProcedure; 
            sqlCmd.CommandText = "sp_alarm_speed_process_get"; 
            NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString()); 
            conn.Open(); 
            sqlCmd.Connection = conn; 
            using (NpgsqlDataReader reader = sqlCmd.ExecuteReader()) 
            { 
                while (reader.Read()) 
                { 
                    AlarmSpeedViewModel model = new AlarmSpeedViewModel(); 
                    model.alarm_speed_id = reader.GetInt32(0); 
                  // you must fill  your model items 
                    AlarmList.Add(model); 
                } 
                reader.Close(); 
                conn.Close(); 
            } 
 
 
 
        } 
        _cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList)); 
        return _cache.Get("SpeedAlarm").ToString(); 
    } 
    public String SerializeObjectToJson(Object alarmspeed) 
    { 
        try 
        { 
            var jss = new JavaScriptSerializer(); 
            return  jss.Serialize(alarmspeed); 
        } 
        catch (Exception) { return null; } 
    } 
    private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e) 
    { 
 
        _hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed"); 
    } 
} 

步骤5调用集线器
<script src="~/lib/signalr.js"></script> 
 
<script type="text/javascript"> 
// Start the connection. 
var connection = new signalR.HubConnectionBuilder() 
    .withUrl('/speedalarmhub') 
    .build(); 
 
 
connection.on('ReceiveMessage', function (message) { 
 
            var encodedMsg = message; 
            // Add the message to the page. 
 
}); 
// Transport fallback functionality is now built into start. 
connection.start() 
    .then(function () { 
 
        console.log('connection started'); 
        connection.invoke('SendMessage'); 
    }) 
    .catch(error => { 
        console.error(error.message); 
    }); 



步骤6.在启动时添加以下代码配置服务
public void ConfigureServices(IServiceCollection services) 
    { 
        services.AddControllersWithViews(); 
        services.AddSignalR(); 
        services.AddMemoryCache(); 
    } 

步骤7.在Configure方法中添加以下代码
app.UseEndpoints(endpoints => 
        { 
            endpoints.MapControllerRoute( 
                name: "default", 
                pattern: "{controller=Home}/{action=Index}/{id?}"); 
              endpoints.MapHub<speedalarmhub>("/speedalarmhub"); 
        }); 


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!